tetsy_kvdb_rocksdb/
lib.rs

1// Copyright 2020 Parity Technologies
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9mod iter;
10mod stats;
11
12use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result};
13
14use tetsy_util_mem::MallocSizeOf;
15use parking_lot::RwLock;
16use rocksdb::{
17	BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB,
18};
19
20use crate::iter::KeyValuePair;
21use fs_swap::{swap, swap_nonatomic};
22use tetsy_kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB};
23use log::{debug, warn};
24
25#[cfg(target_os = "linux")]
26use regex::Regex;
27#[cfg(target_os = "linux")]
28use std::fs::File;
29#[cfg(target_os = "linux")]
30use std::path::PathBuf;
31#[cfg(target_os = "linux")]
32use std::process::Command;
33
34fn other_io_err<E>(e: E) -> io::Error
35where
36	E: Into<Box<dyn error::Error + Send + Sync>>,
37{
38	io::Error::new(io::ErrorKind::Other, e)
39}
40
41// Used for memory budget.
42type MiB = usize;
43
44const KB: usize = 1_024;
45const MB: usize = 1_024 * KB;
46
47/// The default column memory budget in MiB.
48pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
49
50/// The default memory budget in MiB.
51pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;
52
53/// Compaction profile for the database settings
54/// Note, that changing these parameters may trigger
55/// the compaction process of RocksDB on startup.
56/// https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true
57#[derive(Clone, Copy, PartialEq, Debug)]
58pub struct CompactionProfile {
59	/// L0-L1 target file size
60	/// The minimum size should be calculated in accordance with the
61	/// number of levels and the expected size of the database.
62	pub initial_file_size: u64,
63	/// block size
64	pub block_size: usize,
65}
66
67impl Default for CompactionProfile {
68	/// Default profile suitable for most storage
69	fn default() -> CompactionProfile {
70		CompactionProfile::ssd()
71	}
72}
73
74/// Given output of df command return Linux rotational flag file path.
75#[cfg(target_os = "linux")]
76pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
77	use std::str;
78	str::from_utf8(df_out.as_slice())
79		.ok()
80		// Get the drive name.
81		.and_then(|df_str| {
82			Regex::new(r"/dev/(sd[:alpha:]{1,2})")
83				.ok()
84				.and_then(|re| re.captures(df_str))
85				.and_then(|captures| captures.get(1))
86		})
87		// Generate path e.g. /sys/block/sda/queue/rotational
88		.map(|drive_path| {
89			let mut p = PathBuf::from("/sys/block");
90			p.push(drive_path.as_str());
91			p.push("queue/rotational");
92			p
93		})
94}
95
96impl CompactionProfile {
97	/// Attempt to determine the best profile automatically, only Linux for now.
98	#[cfg(target_os = "linux")]
99	pub fn auto(db_path: &Path) -> CompactionProfile {
100		use std::io::Read;
101		let hdd_check_file = db_path
102			.to_str()
103			.and_then(|path_str| Command::new("df").arg(path_str).output().ok())
104			.and_then(|df_res| if df_res.status.success() { Some(df_res.stdout) } else { None })
105			.and_then(rotational_from_df_output);
106		// Read out the file and match compaction profile.
107		if let Some(hdd_check) = hdd_check_file {
108			if let Ok(mut file) = File::open(hdd_check.as_path()) {
109				let mut buffer = [0; 1];
110				if file.read_exact(&mut buffer).is_ok() {
111					// 0 means not rotational.
112					if buffer == [48] {
113						return Self::ssd();
114					}
115					// 1 means rotational.
116					if buffer == [49] {
117						return Self::hdd();
118					}
119				}
120			}
121		}
122		// Fallback if drive type was not determined.
123		Self::default()
124	}
125
126	/// Just default for other platforms.
127	#[cfg(not(target_os = "linux"))]
128	pub fn auto(_db_path: &Path) -> CompactionProfile {
129		Self::default()
130	}
131
132	/// Default profile suitable for SSD storage
133	pub fn ssd() -> CompactionProfile {
134		CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB }
135	}
136
137	/// Slow HDD compaction profile
138	pub fn hdd() -> CompactionProfile {
139		CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB }
140	}
141}
142
143/// Database configuration
144#[derive(Clone)]
145pub struct DatabaseConfig {
146	/// Max number of open files.
147	pub max_open_files: i32,
148	/// Memory budget (in MiB) used for setting block cache size and
149	/// write buffer size for each column including the default one.
150	/// If the memory budget of a column is not specified,
151	/// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column.
152	pub memory_budget: HashMap<u32, MiB>,
153	/// Compaction profile.
154	pub compaction: CompactionProfile,
155	/// Set number of columns.
156	///
157	/// # Safety
158	///
159	/// The number of columns must not be zero.
160	pub columns: u32,
161	/// Specify the maximum number of info/debug log files to be kept.
162	pub keep_log_file_num: i32,
163	/// Enable native RocksDB statistics.
164	/// Disabled by default.
165	///
166	/// It can have a negative performance impact up to 10% according to
167	/// https://github.com/facebook/rocksdb/wiki/Statistics.
168	pub enable_statistics: bool,
169	/// Open the database as a secondary instance.
170	/// Specify a path for the secondary instance of the database.
171	/// Secondary instances are read-only and kept updated by tailing the rocksdb MANIFEST.
172	/// It is up to the user to call `catch_up_with_primary()` manually to update the secondary db.
173	/// Disabled by default.
174	///
175	/// `max_open_files` is overridden to always equal `-1`.
176	/// May have a negative performance impact on the secondary instance
177	/// if the secondary instance reads and applies state changes before the primary instance compacts them.
178	/// More info: https://github.com/facebook/rocksdb/wiki/Secondary-instance
179	pub secondary: Option<String>,
180}
181
182impl DatabaseConfig {
183	/// Create new `DatabaseConfig` with default parameters and specified set of columns.
184	/// Note that cache sizes must be explicitly set.
185	///
186	/// # Safety
187	///
188	/// The number of `columns` must not be zero.
189	pub fn with_columns(columns: u32) -> Self {
190		assert!(columns > 0, "the number of columns must not be zero");
191
192		Self { columns, ..Default::default() }
193	}
194
195	/// Returns the total memory budget in bytes.
196	pub fn memory_budget(&self) -> MiB {
197		(0..self.columns).map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB).sum()
198	}
199
200	/// Returns the memory budget of the specified column in bytes.
201	fn memory_budget_for_col(&self, col: u32) -> MiB {
202		self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
203	}
204
205	// Get column family configuration with the given block based options.
206	fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options {
207		let column_mem_budget = self.memory_budget_for_col(col);
208		let mut opts = Options::default();
209
210		opts.set_level_compaction_dynamic_level_bytes(true);
211		opts.set_block_based_table_factory(block_opts);
212		opts.optimize_level_style_compaction(column_mem_budget);
213		opts.set_target_file_size_base(self.compaction.initial_file_size);
214		opts.set_compression_per_level(&[]);
215
216		opts
217	}
218}
219
220impl Default for DatabaseConfig {
221	fn default() -> DatabaseConfig {
222		DatabaseConfig {
223			max_open_files: 512,
224			memory_budget: HashMap::new(),
225			compaction: CompactionProfile::default(),
226			columns: 1,
227			keep_log_file_num: 1,
228			enable_statistics: false,
229			secondary: None,
230		}
231	}
232}
233
234struct DBAndColumns {
235	db: DB,
236	column_names: Vec<String>,
237}
238
239impl MallocSizeOf for DBAndColumns {
240	fn size_of(&self, ops: &mut tetsy_util_mem::MallocSizeOfOps) -> usize {
241		let mut total = self.column_names.size_of(ops)
242			// we have at least one column always, so we can call property on it
243			+ self.db
244				.property_int_value_cf(self.cf(0), "rocksdb.block-cache-usage")
245				.unwrap_or(Some(0))
246				.map(|x| x as usize)
247				.unwrap_or(0);
248
249		for v in 0..self.column_names.len() {
250			total += self.static_property_or_warn(v, "rocksdb.estimate-table-readers-mem");
251			total += self.static_property_or_warn(v, "rocksdb.cur-size-all-mem-tables");
252		}
253
254		total
255	}
256}
257
258impl DBAndColumns {
259	fn cf(&self, i: usize) -> &ColumnFamily {
260		self.db.cf_handle(&self.column_names[i]).expect("the specified column name is correct; qed")
261	}
262
263	fn static_property_or_warn(&self, col: usize, prop: &str) -> usize {
264		match self.db.property_int_value_cf(self.cf(col), prop) {
265			Ok(Some(v)) => v as usize,
266			_ => {
267				warn!("Cannot read expected static property of RocksDb database: {}", prop);
268				0
269			}
270		}
271	}
272}
273
274/// Key-Value database.
275#[derive(MallocSizeOf)]
276pub struct Database {
277	db: RwLock<Option<DBAndColumns>>,
278	#[ignore_malloc_size_of = "insignificant"]
279	config: DatabaseConfig,
280	path: String,
281	#[ignore_malloc_size_of = "insignificant"]
282	opts: Options,
283	#[ignore_malloc_size_of = "insignificant"]
284	write_opts: WriteOptions,
285	#[ignore_malloc_size_of = "insignificant"]
286	read_opts: ReadOptions,
287	#[ignore_malloc_size_of = "insignificant"]
288	block_opts: BlockBasedOptions,
289	#[ignore_malloc_size_of = "insignificant"]
290	stats: stats::RunningDbStats,
291}
292
293#[inline]
294fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, Error>) -> io::Result<T> {
295	if let Err(ref s) = res {
296		if is_corrupted(s) {
297			warn!("DB corrupted: {}. Repair will be triggered on next restart", s);
298			let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME));
299		}
300	}
301
302	res.map_err(other_io_err)
303}
304
305fn is_corrupted(err: &Error) -> bool {
306	err.as_ref().starts_with("Corruption:")
307		|| err.as_ref().starts_with("Invalid argument: You have to open all column families")
308}
309
310/// Generate the options for RocksDB, based on the given `DatabaseConfig`.
311fn generate_options(config: &DatabaseConfig) -> Options {
312	let mut opts = Options::default();
313
314	opts.set_report_bg_io_stats(true);
315	if config.enable_statistics {
316		opts.enable_statistics();
317	}
318	opts.set_use_fsync(false);
319	opts.create_if_missing(true);
320	if config.secondary.is_some() {
321		opts.set_max_open_files(-1)
322	} else {
323		opts.set_max_open_files(config.max_open_files);
324	}
325	opts.set_bytes_per_sync(1 * MB as u64);
326	opts.set_keep_log_file_num(1);
327	opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
328
329	opts
330}
331
332fn generate_read_options() -> ReadOptions {
333	let mut read_opts = ReadOptions::default();
334	read_opts.set_verify_checksums(false);
335	read_opts
336}
337
338/// Generate the block based options for RocksDB, based on the given `DatabaseConfig`.
339fn generate_block_based_options(config: &DatabaseConfig) -> io::Result<BlockBasedOptions> {
340	let mut block_opts = BlockBasedOptions::default();
341	block_opts.set_block_size(config.compaction.block_size);
342	// See https://github.com/facebook/rocksdb/blob/a1523efcdf2f0e8133b9a9f6e170a0dad49f928f/include/rocksdb/table.h#L246-L271 for details on what the format versions are/do.
343	block_opts.set_format_version(5);
344	block_opts.set_block_restart_interval(16);
345	// Set cache size as recommended by
346	// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
347	let cache_size = config.memory_budget() / 3;
348	if cache_size == 0 {
349		block_opts.disable_cache()
350	} else {
351		let cache = rocksdb::Cache::new_lru_cache(cache_size).map_err(other_io_err)?;
352		block_opts.set_block_cache(&cache);
353		// "index and filter blocks will be stored in block cache, together with all other data blocks."
354		// See: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
355		block_opts.set_cache_index_and_filter_blocks(true);
356		// Don't evict L0 filter/index blocks from the cache
357		block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
358	}
359	block_opts.set_bloom_filter(10, true);
360
361	Ok(block_opts)
362}
363
364impl Database {
365	const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";
366
367	/// Open database file. Creates if it does not exist.
368	///
369	/// # Safety
370	///
371	/// The number of `config.columns` must not be zero.
372	pub fn open(config: &DatabaseConfig, path: &str) -> io::Result<Database> {
373		assert!(config.columns > 0, "the number of columns must not be zero");
374
375		let opts = generate_options(config);
376		let block_opts = generate_block_based_options(config)?;
377
378		// attempt database repair if it has been previously marked as corrupted
379		let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
380		if db_corrupted.exists() {
381			warn!("DB has been previously marked as corrupted, attempting repair");
382			DB::repair(&opts, path).map_err(other_io_err)?;
383			fs::remove_file(db_corrupted)?;
384		}
385
386		let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
387		let write_opts = WriteOptions::default();
388		let read_opts = generate_read_options();
389
390		let db = if let Some(secondary_path) = &config.secondary {
391			Self::open_secondary(&opts, path, secondary_path.as_str(), column_names.as_slice())?
392		} else {
393			let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
394			Self::open_primary(&opts, path, config, column_names.as_slice(), &block_opts)?
395		};
396
397		Ok(Database {
398			db: RwLock::new(Some(DBAndColumns { db, column_names })),
399			config: config.clone(),
400			path: path.to_owned(),
401			opts,
402			read_opts,
403			write_opts,
404			block_opts,
405			stats: stats::RunningDbStats::new(),
406		})
407	}
408
409	/// Internal api to open a database in primary mode.
410	fn open_primary(
411		opts: &Options,
412		path: &str,
413		config: &DatabaseConfig,
414		column_names: &[&str],
415		block_opts: &BlockBasedOptions,
416	) -> io::Result<rocksdb::DB> {
417		let cf_descriptors: Vec<_> = (0..config.columns)
418			.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
419			.collect();
420
421		let db = match DB::open_cf_descriptors(&opts, path, cf_descriptors) {
422			Err(_) => {
423				// retry and create CFs
424				match DB::open_cf(&opts, path, &[] as &[&str]) {
425					Ok(mut db) => {
426						for (i, name) in column_names.iter().enumerate() {
427							let _ = db
428								.create_cf(name, &config.column_config(&block_opts, i as u32))
429								.map_err(other_io_err)?;
430						}
431						Ok(db)
432					}
433					err => err,
434				}
435			}
436			ok => ok,
437		};
438
439		Ok(match db {
440			Ok(db) => db,
441			Err(ref s) if is_corrupted(s) => {
442				warn!("DB corrupted: {}, attempting repair", s);
443				DB::repair(&opts, path).map_err(other_io_err)?;
444
445				let cf_descriptors: Vec<_> = (0..config.columns)
446					.map(|i| {
447						ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i))
448					})
449					.collect();
450
451				DB::open_cf_descriptors(&opts, path, cf_descriptors).map_err(other_io_err)?
452			}
453			Err(s) => return Err(other_io_err(s)),
454		})
455	}
456
457	/// Internal api to open a database in secondary mode.
458	/// Secondary database needs a seperate path to store its own logs.
459	fn open_secondary(
460		opts: &Options,
461		path: &str,
462		secondary_path: &str,
463		column_names: &[String],
464	) -> io::Result<rocksdb::DB> {
465		let db = DB::open_cf_as_secondary(&opts, path, secondary_path, column_names);
466
467		Ok(match db {
468			Ok(db) => db,
469			Err(ref s) if is_corrupted(s) => {
470				warn!("DB corrupted: {}, attempting repair", s);
471				DB::repair(&opts, path).map_err(other_io_err)?;
472				DB::open_cf_as_secondary(&opts, path, secondary_path, column_names).map_err(other_io_err)?
473			}
474			Err(s) => return Err(other_io_err(s)),
475		})
476	}
477
478	/// Helper to create new transaction for this database.
479	pub fn transaction(&self) -> DBTransaction {
480		DBTransaction::new()
481	}
482
483	/// Commit transaction to database.
484	pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
485		match *self.db.read() {
486			Some(ref cfs) => {
487				let mut batch = WriteBatch::default();
488				let ops = tr.ops;
489
490				self.stats.tally_writes(ops.len() as u64);
491				self.stats.tally_transactions(1);
492
493				let mut stats_total_bytes = 0;
494
495				for op in ops {
496					let cf = cfs.cf(op.col() as usize);
497
498					match op {
499						DBOp::Insert { col: _, key, value } => {
500							stats_total_bytes += key.len() + value.len();
501							batch.put_cf(cf, &key, &value);
502						}
503						DBOp::Delete { col: _, key } => {
504							// We count deletes as writes.
505							stats_total_bytes += key.len();
506							batch.delete_cf(cf, &key);
507						}
508						DBOp::DeletePrefix { col, prefix } => {
509							let end_prefix = tetsy_kvdb::end_prefix(&prefix[..]);
510							let no_end = end_prefix.is_none();
511							let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
512							batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
513							if no_end {
514								use crate::iter::IterationHandler as _;
515
516								let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
517								// We call `iter_with_prefix` directly on `cfs` to avoid taking a lock twice
518								// See https://github.com/tetcoin/tetsy-common/pull/396.
519								let read_opts = generate_read_options();
520								for (key, _) in cfs.iter_with_prefix(col, prefix, read_opts) {
521									batch.delete_cf(cf, &key[..]);
522								}
523							}
524						}
525					};
526				}
527				self.stats.tally_bytes_written(stats_total_bytes as u64);
528
529				check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))
530			}
531			None => Err(other_io_err("Database is closed")),
532		}
533	}
534
535	/// Get value by key.
536	pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
537		match *self.db.read() {
538			Some(ref cfs) => {
539				if cfs.column_names.get(col as usize).is_none() {
540					return Err(other_io_err("column index is out of bounds"));
541				}
542				self.stats.tally_reads(1);
543				let value = cfs
544					.db
545					.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
546					.map(|r| r.map(|v| v.to_vec()))
547					.map_err(other_io_err);
548
549				match value {
550					Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
551					Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
552					_ => {}
553				};
554
555				value
556			}
557			None => Ok(None),
558		}
559	}
560
561	/// Get value by partial key. Prefix size should match configured prefix size.
562	pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
563		self.iter_with_prefix(col, prefix).next().map(|(_, v)| v)
564	}
565
566	/// Iterator over the data in the given database column index.
567	/// Will hold a lock until the iterator is dropped
568	/// preventing the database from being closed.
569	pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = KeyValuePair> + 'a {
570		let read_lock = self.db.read();
571		let optional = if read_lock.is_some() {
572			let read_opts = generate_read_options();
573			let guarded = iter::ReadGuardedIterator::new(read_lock, col, read_opts);
574			Some(guarded)
575		} else {
576			None
577		};
578		optional.into_iter().flat_map(identity)
579	}
580
581	/// Iterator over data in the `col` database column index matching the given prefix.
582	/// Will hold a lock until the iterator is dropped
583	/// preventing the database from being closed.
584	fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a {
585		let read_lock = self.db.read();
586		let optional = if read_lock.is_some() {
587			let mut read_opts = generate_read_options();
588			// rocksdb doesn't work with an empty upper bound
589			if let Some(end_prefix) = tetsy_kvdb::end_prefix(prefix) {
590				read_opts.set_iterate_upper_bound(end_prefix);
591			}
592			let guarded = iter::ReadGuardedIterator::new_with_prefix(read_lock, col, prefix, read_opts);
593			Some(guarded)
594		} else {
595			None
596		};
597		optional.into_iter().flat_map(identity)
598	}
599
600	/// Close the database
601	fn close(&self) {
602		*self.db.write() = None;
603	}
604
605	/// Restore the database from a copy at given path.
606	pub fn restore(&self, new_db: &str) -> io::Result<()> {
607		self.close();
608
609		// swap is guaranteed to be atomic
610		match swap(new_db, &self.path) {
611			Ok(_) => {
612				// ignore errors
613				let _ = fs::remove_dir_all(new_db);
614			}
615			Err(err) => {
616				debug!("DB atomic swap failed: {}", err);
617				match swap_nonatomic(new_db, &self.path) {
618					Ok(_) => {
619						// ignore errors
620						let _ = fs::remove_dir_all(new_db);
621					}
622					Err(err) => {
623						warn!("Failed to swap DB directories: {:?}", err);
624						return Err(io::Error::new(
625							io::ErrorKind::Other,
626							"DB restoration failed: could not swap DB directories",
627						));
628					}
629				}
630			}
631		}
632
633		// reopen the database and steal handles into self
634		let db = Self::open(&self.config, &self.path)?;
635		*self.db.write() = mem::replace(&mut *db.db.write(), None);
636		Ok(())
637	}
638
639	/// The number of column families in the db.
640	pub fn num_columns(&self) -> u32 {
641		self.db
642			.read()
643			.as_ref()
644			.and_then(|db| if db.column_names.is_empty() { None } else { Some(db.column_names.len()) })
645			.map(|n| n as u32)
646			.unwrap_or(0)
647	}
648
649	/// The number of keys in a column (estimated).
650	pub fn num_keys(&self, col: u32) -> io::Result<u64> {
651		const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
652		match *self.db.read() {
653			Some(ref cfs) => {
654				let cf = cfs.cf(col as usize);
655				match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
656					Ok(estimate) => Ok(estimate.unwrap_or_default()),
657					Err(err_string) => Err(other_io_err(err_string)),
658				}
659			}
660			None => Ok(0),
661		}
662	}
663
664	/// Remove the last column family in the database. The deletion is definitive.
665	pub fn remove_last_column(&self) -> io::Result<()> {
666		match *self.db.write() {
667			Some(DBAndColumns { ref mut db, ref mut column_names }) => {
668				if let Some(name) = column_names.pop() {
669					db.drop_cf(&name).map_err(other_io_err)?;
670				}
671				Ok(())
672			}
673			None => Ok(()),
674		}
675	}
676
677	/// Add a new column family to the DB.
678	pub fn add_column(&self) -> io::Result<()> {
679		match *self.db.write() {
680			Some(DBAndColumns { ref mut db, ref mut column_names }) => {
681				let col = column_names.len() as u32;
682				let name = format!("col{}", col);
683				let col_config = self.config.column_config(&self.block_opts, col as u32);
684				let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?;
685				column_names.push(name);
686				Ok(())
687			}
688			None => Ok(()),
689		}
690	}
691
692	/// Get RocksDB statistics.
693	pub fn get_statistics(&self) -> HashMap<String, stats::RocksDbStatsValue> {
694		if let Some(stats) = self.opts.get_statistics() {
695			stats::parse_rocksdb_stats(&stats)
696		} else {
697			HashMap::new()
698		}
699	}
700
701	/// Try to catch up a secondary instance with
702	/// the primary by reading as much from the logs as possible.
703	///
704	/// Guaranteed to have changes up to the the time that `try_catch_up_with_primary` is called
705	/// if it finishes succesfully.
706	///
707	/// Blocks until the MANIFEST file and any state changes in the corresponding Write-Ahead-Logs
708	/// are applied to the secondary instance. If the manifest files are very large
709	/// this method could take a long time.
710	///
711	/// If Write-Ahead-Logs have been purged by the primary instance before the secondary
712	/// is able to open them, the secondary will not be caught up
713	/// until this function is called again and new Write-Ahead-Logs are identified.
714	///
715	/// If called while the primary is writing, the catch-up may fail.
716	///
717	/// If the secondary is unable to catch up because of missing logs,
718	/// this method fails silently and no error is returned.
719	///
720	/// Calling this as primary will return an error.
721	pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
722		match self.db.read().as_ref() {
723			Some(DBAndColumns { db, .. }) => db.try_catch_up_with_primary().map_err(other_io_err),
724			None => Ok(()),
725		}
726	}
727}
728
729// duplicate declaration of methods here to avoid trait import in certain existing cases
730// at time of addition.
731impl KeyValueDB for Database {
732	fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
733		Database::get(self, col, key)
734	}
735
736	fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
737		Database::get_by_prefix(self, col, prefix)
738	}
739
740	fn write(&self, transaction: DBTransaction) -> io::Result<()> {
741		Database::write(self, transaction)
742	}
743
744	fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
745		let unboxed = Database::iter(self, col);
746		Box::new(unboxed.into_iter())
747	}
748
749	fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
750		let unboxed = Database::iter_with_prefix(self, col, prefix);
751		Box::new(unboxed.into_iter())
752	}
753
754	fn restore(&self, new_db: &str) -> io::Result<()> {
755		Database::restore(self, new_db)
756	}
757
758	fn io_stats(&self, kind: tetsy_kvdb::IoStatsKind) -> tetsy_kvdb::IoStats {
759		let rocksdb_stats = self.get_statistics();
760		let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64);
761		let overall_stats = self.stats.overall();
762		let old_cache_hit_count = overall_stats.raw.cache_hit_count;
763
764		self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count);
765
766		let taken_stats = match kind {
767			tetsy_kvdb::IoStatsKind::Overall => self.stats.overall(),
768			tetsy_kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
769		};
770
771		let mut stats = tetsy_kvdb::IoStats::empty();
772
773		stats.reads = taken_stats.raw.reads;
774		stats.writes = taken_stats.raw.writes;
775		stats.transactions = taken_stats.raw.transactions;
776		stats.bytes_written = taken_stats.raw.bytes_written;
777		stats.bytes_read = taken_stats.raw.bytes_read;
778		stats.cache_reads = taken_stats.raw.cache_hit_count;
779		stats.started = taken_stats.started;
780		stats.span = taken_stats.started.elapsed();
781
782		stats
783	}
784}
785
786#[cfg(test)]
787mod tests {
788	use super::*;
789	use tetsy_kvdb_shared_tests as st;
790	use std::io::{self, Read};
791	use tempfile::Builder as TempfileBuilder;
792
793	fn create(columns: u32) -> io::Result<Database> {
794		let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
795		let config = DatabaseConfig::with_columns(columns);
796		Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))
797	}
798
799	#[test]
800	fn get_fails_with_non_existing_column() -> io::Result<()> {
801		let db = create(1)?;
802		st::test_get_fails_with_non_existing_column(&db)
803	}
804
805	#[test]
806	fn put_and_get() -> io::Result<()> {
807		let db = create(1)?;
808		st::test_put_and_get(&db)
809	}
810
811	#[test]
812	fn delete_and_get() -> io::Result<()> {
813		let db = create(1)?;
814		st::test_delete_and_get(&db)
815	}
816
817	#[test]
818	fn delete_prefix() -> io::Result<()> {
819		let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
820		st::test_delete_prefix(&db)
821	}
822
823	#[test]
824	fn iter() -> io::Result<()> {
825		let db = create(1)?;
826		st::test_iter(&db)
827	}
828
829	#[test]
830	fn iter_with_prefix() -> io::Result<()> {
831		let db = create(1)?;
832		st::test_iter_with_prefix(&db)
833	}
834
835	#[test]
836	fn complex() -> io::Result<()> {
837		let db = create(1)?;
838		st::test_complex(&db)
839	}
840
841	#[test]
842	fn stats() -> io::Result<()> {
843		let db = create(st::IO_STATS_NUM_COLUMNS)?;
844		st::test_io_stats(&db)
845	}
846
847	#[test]
848	fn secondary_db_get() -> io::Result<()> {
849		let primary = TempfileBuilder::new().prefix("").tempdir()?;
850		let config = DatabaseConfig::with_columns(1);
851		let db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
852
853		let key1 = b"key1";
854		let mut transaction = db.transaction();
855		transaction.put(0, key1, b"horse");
856		db.write(transaction)?;
857
858		let config = DatabaseConfig {
859			secondary: TempfileBuilder::new().prefix("").tempdir()?.path().to_str().map(|s| s.to_string()),
860			..DatabaseConfig::with_columns(1)
861		};
862		let second_db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
863		assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
864		Ok(())
865	}
866
867	#[test]
868	fn secondary_db_catch_up() -> io::Result<()> {
869		let primary = TempfileBuilder::new().prefix("").tempdir()?;
870		let config = DatabaseConfig::with_columns(1);
871		let db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
872
873		let config = DatabaseConfig {
874			secondary: TempfileBuilder::new().prefix("").tempdir()?.path().to_str().map(|s| s.to_string()),
875			..DatabaseConfig::with_columns(1)
876		};
877		let second_db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
878
879		let mut transaction = db.transaction();
880		transaction.put(0, b"key1", b"mule");
881		transaction.put(0, b"key2", b"cat");
882		db.write(transaction)?;
883
884		second_db.try_catch_up_with_primary()?;
885		assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
886		Ok(())
887	}
888
889	#[test]
890	fn mem_tables_size() {
891		let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
892
893		let config = DatabaseConfig {
894			max_open_files: 512,
895			memory_budget: HashMap::new(),
896			compaction: CompactionProfile::default(),
897			columns: 11,
898			keep_log_file_num: 1,
899			enable_statistics: false,
900			secondary: None,
901		};
902
903		let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
904
905		let mut batch = db.transaction();
906		for i in 0u32..10000u32 {
907			batch.put(i / 1000 + 1, &i.to_le_bytes(), &(i * 17).to_le_bytes());
908		}
909		db.write(batch).unwrap();
910
911		{
912			let db = db.db.read();
913			db.as_ref().map(|db| {
914				assert!(db.static_property_or_warn(0, "rocksdb.cur-size-all-mem-tables") > 512);
915			});
916		}
917	}
918
919	#[test]
920	#[cfg(target_os = "linux")]
921	fn df_to_rotational() {
922		use std::path::PathBuf;
923		// Example df output.
924		let example_df = vec![
925			70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107,
926			115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101,
927			37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32,
928			32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57,
929			52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10,
930		];
931		let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
932		assert_eq!(rotational_from_df_output(example_df), expected_output);
933	}
934
935	#[test]
936	#[should_panic]
937	fn db_config_with_zero_columns() {
938		let _cfg = DatabaseConfig::with_columns(0);
939	}
940
941	#[test]
942	#[should_panic]
943	fn open_db_with_zero_columns() {
944		let cfg = DatabaseConfig { columns: 0, ..Default::default() };
945		let _db = Database::open(&cfg, "");
946	}
947
948	#[test]
949	fn add_columns() {
950		let config_1 = DatabaseConfig::default();
951		let config_5 = DatabaseConfig::with_columns(5);
952
953		let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
954
955		// open 1, add 4.
956		{
957			let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
958			assert_eq!(db.num_columns(), 1);
959
960			for i in 2..=5 {
961				db.add_column().unwrap();
962				assert_eq!(db.num_columns(), i);
963			}
964		}
965
966		// reopen as 5.
967		{
968			let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
969			assert_eq!(db.num_columns(), 5);
970		}
971	}
972
973	#[test]
974	fn remove_columns() {
975		let config_1 = DatabaseConfig::default();
976		let config_5 = DatabaseConfig::with_columns(5);
977
978		let tempdir = TempfileBuilder::new().prefix("drop_columns").tempdir().unwrap();
979
980		// open 5, remove 4.
981		{
982			let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).expect("open with 5 columns");
983			assert_eq!(db.num_columns(), 5);
984
985			for i in (1..5).rev() {
986				db.remove_last_column().unwrap();
987				assert_eq!(db.num_columns(), i);
988			}
989		}
990
991		// reopen as 1.
992		{
993			let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
994			assert_eq!(db.num_columns(), 1);
995		}
996	}
997
998	#[test]
999	fn test_num_keys() {
1000		let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
1001		let config = DatabaseConfig::with_columns(1);
1002		let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
1003
1004		assert_eq!(db.num_keys(0).unwrap(), 0, "database is empty after creation");
1005		let key1 = b"beef";
1006		let mut batch = db.transaction();
1007		batch.put(0, key1, key1);
1008		db.write(batch).unwrap();
1009		assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count");
1010	}
1011
1012	#[test]
1013	fn default_memory_budget() {
1014		let c = DatabaseConfig::default();
1015		assert_eq!(c.columns, 1);
1016		assert_eq!(c.memory_budget(), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget is default");
1017		assert_eq!(
1018			c.memory_budget_for_col(0),
1019			DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
1020			"total memory budget for column 0 is the default"
1021		);
1022		assert_eq!(
1023			c.memory_budget_for_col(999),
1024			DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
1025			"total memory budget for any column is the default"
1026		);
1027	}
1028
1029	#[test]
1030	fn memory_budget() {
1031		let mut c = DatabaseConfig::with_columns(3);
1032		c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().collect();
1033		assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
1034	}
1035
1036	#[test]
1037	fn test_stats_parser() {
1038		let raw = r#"rocksdb.row.cache.hit COUNT : 1
1039rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15
1040"#;
1041		let stats = stats::parse_rocksdb_stats(raw);
1042		assert_eq!(stats["row.cache.hit"].count, 1);
1043		assert!(stats["row.cache.hit"].times.is_none());
1044		assert_eq!(stats["db.get.micros"].count, 0);
1045		let get_times = stats["db.get.micros"].times.unwrap();
1046		assert_eq!(get_times.sum, 15);
1047		assert_eq!(get_times.p50, 2.0);
1048		assert_eq!(get_times.p95, 3.0);
1049		assert_eq!(get_times.p99, 4.0);
1050		assert_eq!(get_times.p100, 5.0);
1051	}
1052
1053	#[test]
1054	fn rocksdb_settings() {
1055		const NUM_COLS: usize = 2;
1056		let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
1057		cfg.max_open_files = 123; // is capped by the OS fd limit (typically 1024)
1058		cfg.compaction.block_size = 323232;
1059		cfg.compaction.initial_file_size = 102030;
1060		cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();
1061
1062		let db_path = TempfileBuilder::new().prefix("config_test").tempdir().expect("the OS can create tmp dirs");
1063		let db = Database::open(&cfg, db_path.path().to_str().unwrap()).expect("can open a db");
1064		let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap()))
1065			.expect("rocksdb creates a LOG file");
1066		let mut settings = String::new();
1067		let statistics = db.get_statistics();
1068		assert!(statistics.contains_key("block.cache.hit"));
1069
1070		rocksdb_log.read_to_string(&mut settings).unwrap();
1071		// Check column count
1072		assert!(settings.contains("Options for column family [default]"), "no default col");
1073		assert!(settings.contains("Options for column family [col0]"), "no col0");
1074		assert!(settings.contains("Options for column family [col1]"), "no col1");
1075
1076		// Check max_open_files
1077		assert!(settings.contains("max_open_files: 123"));
1078
1079		// Check block size
1080		assert!(settings.contains(" block_size: 323232"));
1081
1082		// LRU cache (default column)
1083		assert!(settings.contains("block_cache_options:\n    capacity : 8388608"));
1084		// LRU cache for non-default columns is ⅓ of memory budget (including default column)
1085		let lru_size = (330 * MB) / 3;
1086		let needle = format!("block_cache_options:\n    capacity : {}", lru_size);
1087		let lru = settings.match_indices(&needle).collect::<Vec<_>>().len();
1088		assert_eq!(lru, NUM_COLS);
1089
1090		// Index/filters share cache
1091		let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::<Vec<_>>().len();
1092		assert_eq!(include_indexes, NUM_COLS);
1093		// Pin index/filters on L0
1094		let pins = settings.matches("pin_l0_filter_and_index_blocks_in_cache: 1").collect::<Vec<_>>().len();
1095		assert_eq!(pins, NUM_COLS);
1096
1097		// Check target file size, aka initial file size
1098		let l0_sizes = settings.matches("target_file_size_base: 102030").collect::<Vec<_>>().len();
1099		assert_eq!(l0_sizes, NUM_COLS);
1100		// The default column uses the default of 64Mb regardless of the setting.
1101		assert!(settings.contains("target_file_size_base: 67108864"));
1102
1103		// Check compression settings
1104		let snappy_compression = settings.matches("Options.compression: Snappy").collect::<Vec<_>>().len();
1105		// All columns use Snappy
1106		assert_eq!(snappy_compression, NUM_COLS + 1);
1107		// …even for L7
1108		let snappy_bottommost = settings.matches("Options.bottommost_compression: Disabled").collect::<Vec<_>>().len();
1109		assert_eq!(snappy_bottommost, NUM_COLS + 1);
1110
1111		// 7 levels
1112		let levels = settings.matches("Options.num_levels: 7").collect::<Vec<_>>().len();
1113		assert_eq!(levels, NUM_COLS + 1);
1114
1115		// Don't fsync every store
1116		assert!(settings.contains("Options.use_fsync: 0"));
1117
1118		// We're using the new format
1119		assert!(settings.contains("format_version: 5"));
1120	}
1121}