kvdb_rocksdb/
lib.rs

1// Copyright 2020 Parity Technologies
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9mod iter;
10mod stats;
11
12use std::{
13	cmp,
14	collections::HashMap,
15	error, io,
16	path::{Path, PathBuf},
17};
18
19use rocksdb::{
20	BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, CompactOptions, Options, ReadOptions, WriteBatch,
21	WriteOptions, DB,
22};
23
24use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
25
26#[cfg(target_os = "linux")]
27use regex::Regex;
28#[cfg(target_os = "linux")]
29use std::fs::File;
30#[cfg(target_os = "linux")]
31use std::process::Command;
32
33fn other_io_err<E>(e: E) -> io::Error
34where
35	E: Into<Box<dyn error::Error + Send + Sync>>,
36{
37	io::Error::new(io::ErrorKind::Other, e)
38}
39
40fn invalid_column(col: u32) -> io::Error {
41	other_io_err(format!("No such column family: {:?}", col))
42}
43
44// Used for memory budget.
45type MiB = usize;
46
47const KB: usize = 1_024;
48const MB: usize = 1_024 * KB;
49
50/// The default column memory budget in MiB.
51pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
52
53/// The default memory budget in MiB.
54pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;
55
56/// Compaction profile for the database settings
57/// Note, that changing these parameters may trigger
58/// the compaction process of RocksDB on startup.
59/// https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true
60#[derive(Clone, Copy, PartialEq, Debug)]
61pub struct CompactionProfile {
62	/// L0-L1 target file size
63	/// The minimum size should be calculated in accordance with the
64	/// number of levels and the expected size of the database.
65	pub initial_file_size: u64,
66	/// block size
67	pub block_size: usize,
68}
69
70impl Default for CompactionProfile {
71	/// Default profile suitable for most storage
72	fn default() -> CompactionProfile {
73		CompactionProfile::ssd()
74	}
75}
76
77/// Given output of df command return Linux rotational flag file path.
78#[cfg(target_os = "linux")]
79pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
80	use std::str;
81	str::from_utf8(df_out.as_slice())
82		.ok()
83		// Get the drive name.
84		.and_then(|df_str| {
85			Regex::new(r"/dev/(sd[:alpha:]{1,2})")
86				.ok()
87				.and_then(|re| re.captures(df_str))
88				.and_then(|captures| captures.get(1))
89		})
90		// Generate path e.g. /sys/block/sda/queue/rotational
91		.map(|drive_path| {
92			let mut p = PathBuf::from("/sys/block");
93			p.push(drive_path.as_str());
94			p.push("queue/rotational");
95			p
96		})
97}
98
99impl CompactionProfile {
100	/// Attempt to determine the best profile automatically, only Linux for now.
101	#[cfg(target_os = "linux")]
102	pub fn auto<P: AsRef<Path>>(db_path: P) -> CompactionProfile {
103		use std::io::Read;
104		let hdd_check_file = db_path
105			.as_ref()
106			.to_str()
107			.and_then(|path_str| Command::new("df").arg(path_str).output().ok())
108			.and_then(|df_res| if df_res.status.success() { Some(df_res.stdout) } else { None })
109			.and_then(rotational_from_df_output);
110		// Read out the file and match compaction profile.
111		if let Some(hdd_check) = hdd_check_file {
112			if let Ok(mut file) = File::open(hdd_check.as_path()) {
113				let mut buffer = [0; 1];
114				if file.read_exact(&mut buffer).is_ok() {
115					// 0 means not rotational.
116					if buffer == [48] {
117						return Self::ssd()
118					}
119					// 1 means rotational.
120					if buffer == [49] {
121						return Self::hdd()
122					}
123				}
124			}
125		}
126		// Fallback if drive type was not determined.
127		Self::default()
128	}
129
130	/// Just default for other platforms.
131	#[cfg(not(target_os = "linux"))]
132	pub fn auto<P: AsRef<Path>>(_db_path: P) -> CompactionProfile {
133		Self::default()
134	}
135
136	/// Default profile suitable for SSD storage
137	pub fn ssd() -> CompactionProfile {
138		CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB }
139	}
140
141	/// Slow HDD compaction profile
142	pub fn hdd() -> CompactionProfile {
143		CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB }
144	}
145}
146
147/// Database configuration
148#[derive(Clone)]
149#[non_exhaustive]
150pub struct DatabaseConfig {
151	/// Max number of open files.
152	pub max_open_files: i32,
153	/// Memory budget (in MiB) used for setting block cache size and
154	/// write buffer size for each column including the default one.
155	/// If the memory budget of a column is not specified,
156	/// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column.
157	pub memory_budget: HashMap<u32, MiB>,
158	/// Compaction profile.
159	pub compaction: CompactionProfile,
160	/// Set number of columns.
161	///
162	/// # Safety
163	///
164	/// The number of columns must not be zero.
165	pub columns: u32,
166	/// Specify the maximum number of info/debug log files to be kept.
167	pub keep_log_file_num: i32,
168	/// Enable native RocksDB statistics.
169	/// Disabled by default.
170	///
171	/// It can have a negative performance impact up to 10% according to
172	/// https://github.com/facebook/rocksdb/wiki/Statistics.
173	pub enable_statistics: bool,
174	/// Open the database as a secondary instance.
175	/// Specify a path for the secondary instance of the database.
176	/// Secondary instances are read-only and kept updated by tailing the rocksdb MANIFEST.
177	/// It is up to the user to call `catch_up_with_primary()` manually to update the secondary db.
178	/// Disabled by default.
179	///
180	/// `max_open_files` is overridden to always equal `-1`.
181	/// May have a negative performance impact on the secondary instance
182	/// if the secondary instance reads and applies state changes before the primary instance compacts them.
183	/// More info: https://github.com/facebook/rocksdb/wiki/Secondary-instance
184	pub secondary: Option<PathBuf>,
185	/// Limit the size (in bytes) of write ahead logs
186	/// More info: https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log
187	pub max_total_wal_size: Option<u64>,
188	/// Creates a new database if no database exists.
189	/// Set to `true` by default for backwards compatibility.
190	pub create_if_missing: bool,
191}
192
193impl DatabaseConfig {
194	/// Create new `DatabaseConfig` with default parameters and specified set of columns.
195	/// Note that cache sizes must be explicitly set.
196	///
197	/// # Safety
198	///
199	/// The number of `columns` must not be zero.
200	pub fn with_columns(columns: u32) -> Self {
201		assert!(columns > 0, "the number of columns must not be zero");
202
203		Self { columns, ..Default::default() }
204	}
205
206	/// Returns the total memory budget in bytes.
207	pub fn memory_budget(&self) -> MiB {
208		(0..self.columns)
209			.map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
210			.sum()
211	}
212
213	/// Returns the memory budget of the specified column in bytes.
214	fn memory_budget_for_col(&self, col: u32) -> MiB {
215		self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
216	}
217
218	// Get column family configuration with the given block based options.
219	fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options {
220		let column_mem_budget = self.memory_budget_for_col(col);
221		let mut opts = Options::default();
222
223		opts.set_level_compaction_dynamic_level_bytes(true);
224		opts.set_block_based_table_factory(block_opts);
225		opts.optimize_level_style_compaction(column_mem_budget);
226		opts.set_target_file_size_base(self.compaction.initial_file_size);
227		opts.set_compression_per_level(&[]);
228
229		opts
230	}
231}
232
233impl Default for DatabaseConfig {
234	fn default() -> DatabaseConfig {
235		DatabaseConfig {
236			max_open_files: 512,
237			memory_budget: HashMap::new(),
238			compaction: CompactionProfile::default(),
239			columns: 1,
240			keep_log_file_num: 1,
241			enable_statistics: false,
242			secondary: None,
243			max_total_wal_size: None,
244			create_if_missing: true,
245		}
246	}
247}
248
249struct DBAndColumns {
250	db: DB,
251	column_names: Vec<String>,
252}
253
254impl DBAndColumns {
255	fn cf(&self, i: usize) -> io::Result<&ColumnFamily> {
256		let name = self.column_names.get(i).ok_or_else(|| invalid_column(i as u32))?;
257		self.db
258			.cf_handle(&name)
259			.ok_or_else(|| other_io_err(format!("invalid column name: {name}")))
260	}
261}
262
263/// Key-Value database.
264pub struct Database {
265	inner: DBAndColumns,
266	config: DatabaseConfig,
267	opts: Options,
268	write_opts: WriteOptions,
269	read_opts: ReadOptions,
270	block_opts: BlockBasedOptions,
271	stats: stats::RunningDbStats,
272}
273
274/// Generate the options for RocksDB, based on the given `DatabaseConfig`.
275fn generate_options(config: &DatabaseConfig) -> Options {
276	let mut opts = Options::default();
277
278	opts.set_report_bg_io_stats(true);
279	if config.enable_statistics {
280		opts.enable_statistics();
281	}
282	opts.set_use_fsync(false);
283	opts.create_if_missing(config.create_if_missing);
284	if config.secondary.is_some() {
285		opts.set_max_open_files(-1)
286	} else {
287		opts.set_max_open_files(config.max_open_files);
288	}
289	opts.set_bytes_per_sync(1 * MB as u64);
290	opts.set_keep_log_file_num(1);
291	opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
292	if let Some(m) = config.max_total_wal_size {
293		opts.set_max_total_wal_size(m);
294	}
295
296	opts
297}
298
299fn generate_read_options() -> ReadOptions {
300	let mut read_opts = ReadOptions::default();
301	read_opts.set_verify_checksums(false);
302	read_opts
303}
304
305/// Generate the block based options for RocksDB, based on the given `DatabaseConfig`.
306fn generate_block_based_options(config: &DatabaseConfig) -> io::Result<BlockBasedOptions> {
307	let mut block_opts = BlockBasedOptions::default();
308	block_opts.set_block_size(config.compaction.block_size);
309	// See https://github.com/facebook/rocksdb/blob/a1523efcdf2f0e8133b9a9f6e170a0dad49f928f/include/rocksdb/table.h#L246-L271 for details on what the format versions are/do.
310	block_opts.set_format_version(5);
311	block_opts.set_block_restart_interval(16);
312	// Set cache size as recommended by
313	// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
314	let cache_size = config.memory_budget() / 3;
315	if cache_size == 0 {
316		block_opts.disable_cache()
317	} else {
318		let cache = rocksdb::Cache::new_lru_cache(cache_size);
319		block_opts.set_block_cache(&cache);
320		// "index and filter blocks will be stored in block cache, together with all other data blocks."
321		// See: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
322		block_opts.set_cache_index_and_filter_blocks(true);
323		// Don't evict L0 filter/index blocks from the cache
324		block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
325	}
326	block_opts.set_bloom_filter(10.0, true);
327
328	Ok(block_opts)
329}
330
331impl Database {
332	/// Open database file.
333	///
334	/// # Safety
335	///
336	/// The number of `config.columns` must not be zero.
337	pub fn open<P: AsRef<Path>>(config: &DatabaseConfig, path: P) -> io::Result<Database> {
338		assert!(config.columns > 0, "the number of columns must not be zero");
339
340		let opts = generate_options(config);
341		let block_opts = generate_block_based_options(config)?;
342
343		let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
344		let write_opts = WriteOptions::default();
345		let read_opts = generate_read_options();
346
347		let db = if let Some(secondary_path) = &config.secondary {
348			Self::open_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names.as_slice())?
349		} else {
350			let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
351			Self::open_primary(&opts, path.as_ref(), config, column_names.as_slice(), &block_opts)?
352		};
353
354		Ok(Database {
355			inner: DBAndColumns { db, column_names },
356			config: config.clone(),
357			opts,
358			read_opts,
359			write_opts,
360			block_opts,
361			stats: stats::RunningDbStats::new(),
362		})
363	}
364
365	/// Internal api to open a database in primary mode.
366	fn open_primary<P: AsRef<Path>>(
367		opts: &Options,
368		path: P,
369		config: &DatabaseConfig,
370		column_names: &[&str],
371		block_opts: &BlockBasedOptions,
372	) -> io::Result<rocksdb::DB> {
373		let cf_descriptors: Vec<_> = (0..config.columns)
374			.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
375			.collect();
376
377		let db = match DB::open_cf_descriptors(&opts, path.as_ref(), cf_descriptors) {
378			Err(_) => {
379				// retry and create CFs
380				match DB::open_cf(&opts, path.as_ref(), &[] as &[&str]) {
381					Ok(mut db) => {
382						for (i, name) in column_names.iter().enumerate() {
383							let _ = db
384								.create_cf(name, &config.column_config(&block_opts, i as u32))
385								.map_err(other_io_err)?;
386						}
387						Ok(db)
388					},
389					err => err,
390				}
391			},
392			ok => ok,
393		};
394
395		Ok(match db {
396			Ok(db) => db,
397			Err(s) => return Err(other_io_err(s)),
398		})
399	}
400
401	/// Internal api to open a database in secondary mode.
402	/// Secondary database needs a seperate path to store its own logs.
403	fn open_secondary<P: AsRef<Path>>(
404		opts: &Options,
405		path: P,
406		secondary_path: P,
407		column_names: &[String],
408	) -> io::Result<rocksdb::DB> {
409		let db = DB::open_cf_as_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names);
410
411		Ok(match db {
412			Ok(db) => db,
413			Err(s) => return Err(other_io_err(s)),
414		})
415	}
416
417	/// Helper to create new transaction for this database.
418	pub fn transaction(&self) -> DBTransaction {
419		DBTransaction::new()
420	}
421
422	/// Commit transaction to database.
423	pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
424		let cfs = &self.inner;
425		let mut batch = WriteBatch::default();
426		let ops = tr.ops;
427
428		self.stats.tally_writes(ops.len() as u64);
429		self.stats.tally_transactions(1);
430
431		let mut stats_total_bytes = 0;
432
433		for op in ops {
434			let col = op.col();
435			let cf = cfs.cf(col as usize)?;
436
437			match op {
438				DBOp::Insert { col: _, key, value } => {
439					stats_total_bytes += key.len() + value.len();
440					batch.put_cf(cf, &key, &value);
441				},
442				DBOp::Delete { col: _, key } => {
443					// We count deletes as writes.
444					stats_total_bytes += key.len();
445					batch.delete_cf(cf, &key);
446				},
447				DBOp::DeletePrefix { col, prefix } => {
448					let end_prefix = kvdb::end_prefix(&prefix[..]);
449					let no_end = end_prefix.is_none();
450					let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
451					batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
452					if no_end {
453						let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
454						for result in self.iter_with_prefix(col, prefix) {
455							let (key, _) = result?;
456							batch.delete_cf(cf, &key[..]);
457						}
458					}
459				},
460			};
461		}
462		self.stats.tally_bytes_written(stats_total_bytes as u64);
463
464		cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err)
465	}
466
467	/// Get value by key.
468	pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
469		let cfs = &self.inner;
470		let cf = cfs.cf(col as usize)?;
471		self.stats.tally_reads(1);
472		let value = cfs
473			.db
474			.get_pinned_cf_opt(cf, key, &self.read_opts)
475			.map(|r| r.map(|v| v.to_vec()))
476			.map_err(other_io_err);
477
478		match value {
479			Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
480			Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
481			_ => {},
482		};
483
484		value
485	}
486
487	/// Get value by partial key. Prefix size should match configured prefix size.
488	pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
489		self.iter_with_prefix(col, prefix)
490			.next()
491			.transpose()
492			.map(|m| m.map(|(_k, v)| v))
493	}
494
495	/// Iterator over the data in the given database column index.
496	/// Will hold a lock until the iterator is dropped
497	/// preventing the database from being closed.
498	pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
499		let read_opts = generate_read_options();
500		iter::IterationHandler::iter(&self.inner, col, read_opts)
501	}
502
503	/// Iterator over data in the `col` database column index matching the given prefix.
504	/// Will hold a lock until the iterator is dropped
505	/// preventing the database from being closed.
506	fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
507		let mut read_opts = generate_read_options();
508		// rocksdb doesn't work with an empty upper bound
509		if let Some(end_prefix) = kvdb::end_prefix(prefix) {
510			read_opts.set_iterate_upper_bound(end_prefix);
511		}
512		iter::IterationHandler::iter_with_prefix(&self.inner, col, prefix, read_opts)
513	}
514
515	/// The number of column families in the db.
516	pub fn num_columns(&self) -> u32 {
517		self.inner.column_names.len() as u32
518	}
519
520	/// The number of keys in a column (estimated).
521	pub fn num_keys(&self, col: u32) -> io::Result<u64> {
522		const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
523		let cfs = &self.inner;
524		let cf = cfs.cf(col as usize)?;
525		match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
526			Ok(estimate) => Ok(estimate.unwrap_or_default()),
527			Err(err_string) => Err(other_io_err(err_string)),
528		}
529	}
530
531	/// Remove the last column family in the database. The deletion is definitive.
532	pub fn remove_last_column(&mut self) -> io::Result<()> {
533		let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
534		if let Some(name) = column_names.pop() {
535			db.drop_cf(&name).map_err(other_io_err)?;
536		}
537		Ok(())
538	}
539
540	/// Add a new column family to the DB.
541	pub fn add_column(&mut self) -> io::Result<()> {
542		let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
543		let col = column_names.len() as u32;
544		let name = format!("col{}", col);
545		let col_config = self.config.column_config(&self.block_opts, col as u32);
546		let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?;
547		column_names.push(name);
548		Ok(())
549	}
550
551	/// Get RocksDB statistics.
552	pub fn get_statistics(&self) -> HashMap<String, stats::RocksDbStatsValue> {
553		if let Some(stats) = self.opts.get_statistics() {
554			stats::parse_rocksdb_stats(&stats)
555		} else {
556			HashMap::new()
557		}
558	}
559
560	/// Try to catch up a secondary instance with
561	/// the primary by reading as much from the logs as possible.
562	///
563	/// Guaranteed to have changes up to the the time that `try_catch_up_with_primary` is called
564	/// if it finishes succesfully.
565	///
566	/// Blocks until the MANIFEST file and any state changes in the corresponding Write-Ahead-Logs
567	/// are applied to the secondary instance. If the manifest files are very large
568	/// this method could take a long time.
569	///
570	/// If Write-Ahead-Logs have been purged by the primary instance before the secondary
571	/// is able to open them, the secondary will not be caught up
572	/// until this function is called again and new Write-Ahead-Logs are identified.
573	///
574	/// If called while the primary is writing, the catch-up may fail.
575	///
576	/// If the secondary is unable to catch up because of missing logs,
577	/// this method fails silently and no error is returned.
578	///
579	/// Calling this as primary will return an error.
580	pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
581		self.inner.db.try_catch_up_with_primary().map_err(other_io_err)
582	}
583
584	/// Force compact a single column.
585	///
586	/// After compaction of the column, this may lead to better read performance.
587	pub fn force_compact(&self, col: u32) -> io::Result<()> {
588		let mut compact_options = CompactOptions::default();
589		compact_options.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force);
590		self.inner.db.compact_range_cf_opt(
591			self.inner.cf(col as usize)?,
592			None::<Vec<u8>>,
593			None::<Vec<u8>>,
594			&compact_options,
595		);
596		Ok(())
597	}
598}
599
600// Duplicate declaration of methods here to avoid trait import in certain existing cases
601// at time of addition.
602impl KeyValueDB for Database {
603	fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
604		Database::get(self, col, key)
605	}
606
607	fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
608		Database::get_by_prefix(self, col, prefix)
609	}
610
611	fn write(&self, transaction: DBTransaction) -> io::Result<()> {
612		Database::write(self, transaction)
613	}
614
615	fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
616		let unboxed = Database::iter(self, col);
617		Box::new(unboxed.into_iter())
618	}
619
620	fn iter_with_prefix<'a>(
621		&'a self,
622		col: u32,
623		prefix: &'a [u8],
624	) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
625		let unboxed = Database::iter_with_prefix(self, col, prefix);
626		Box::new(unboxed.into_iter())
627	}
628
629	fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats {
630		let rocksdb_stats = self.get_statistics();
631		let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64);
632		let overall_stats = self.stats.overall();
633		let old_cache_hit_count = overall_stats.raw.cache_hit_count;
634
635		self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count);
636
637		let taken_stats = match kind {
638			kvdb::IoStatsKind::Overall => self.stats.overall(),
639			kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
640		};
641
642		let mut stats = kvdb::IoStats::empty();
643
644		stats.reads = taken_stats.raw.reads;
645		stats.writes = taken_stats.raw.writes;
646		stats.transactions = taken_stats.raw.transactions;
647		stats.bytes_written = taken_stats.raw.bytes_written;
648		stats.bytes_read = taken_stats.raw.bytes_read;
649		stats.cache_reads = taken_stats.raw.cache_hit_count;
650		stats.started = taken_stats.started;
651		stats.span = taken_stats.started.elapsed();
652
653		stats
654	}
655}
656
657#[cfg(test)]
658mod tests {
659	use super::*;
660	use kvdb_shared_tests as st;
661	use std::io::{self, Read};
662	use tempfile::Builder as TempfileBuilder;
663
664	fn create(columns: u32) -> io::Result<Database> {
665		let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
666		let config = DatabaseConfig::with_columns(columns);
667		Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))
668	}
669
670	#[test]
671	fn get_fails_with_non_existing_column() -> io::Result<()> {
672		let db = create(1)?;
673		st::test_get_fails_with_non_existing_column(&db)
674	}
675
676	#[test]
677	fn put_and_get() -> io::Result<()> {
678		let db = create(1)?;
679		st::test_put_and_get(&db)
680	}
681
682	#[test]
683	fn delete_and_get() -> io::Result<()> {
684		let db = create(1)?;
685		st::test_delete_and_get(&db)
686	}
687
688	#[test]
689	fn delete_prefix() -> io::Result<()> {
690		let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
691		st::test_delete_prefix(&db)
692	}
693
694	#[test]
695	fn iter() -> io::Result<()> {
696		let db = create(1)?;
697		st::test_iter(&db)
698	}
699
700	#[test]
701	fn iter_with_prefix() -> io::Result<()> {
702		let db = create(1)?;
703		st::test_iter_with_prefix(&db)
704	}
705
706	#[test]
707	fn complex() -> io::Result<()> {
708		let db = create(1)?;
709		st::test_complex(&db)
710	}
711
712	#[test]
713	fn stats() -> io::Result<()> {
714		let db = create(st::IO_STATS_NUM_COLUMNS)?;
715		st::test_io_stats(&db)
716	}
717
718	#[test]
719	fn secondary_db_get() -> io::Result<()> {
720		let primary = TempfileBuilder::new().prefix("").tempdir()?;
721		let secondary = TempfileBuilder::new().prefix("").tempdir()?;
722		let config = DatabaseConfig::with_columns(1);
723		let db = Database::open(&config, primary.path()).unwrap();
724
725		let key1 = b"key1";
726		let mut transaction = db.transaction();
727		transaction.put(0, key1, b"horse");
728		db.write(transaction)?;
729
730		let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
731		let second_db = Database::open(&config, primary.path()).unwrap();
732		assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
733		Ok(())
734	}
735
736	#[test]
737	fn secondary_db_catch_up() -> io::Result<()> {
738		let primary = TempfileBuilder::new().prefix("").tempdir()?;
739		let secondary = TempfileBuilder::new().prefix("").tempdir()?;
740		let config = DatabaseConfig::with_columns(1);
741		let db = Database::open(&config, primary.path()).unwrap();
742
743		let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
744		let second_db = Database::open(&config, primary.path()).unwrap();
745
746		let mut transaction = db.transaction();
747		transaction.put(0, b"key1", b"mule");
748		transaction.put(0, b"key2", b"cat");
749		db.write(transaction)?;
750
751		second_db.try_catch_up_with_primary()?;
752		assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
753		Ok(())
754	}
755
756	#[test]
757	#[cfg(target_os = "linux")]
758	fn df_to_rotational() {
759		use std::path::PathBuf;
760		// Example df output.
761		let example_df = vec![
762			70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107,
763			115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101,
764			37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32,
765			32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57,
766			52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10,
767		];
768		let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
769		assert_eq!(rotational_from_df_output(example_df), expected_output);
770	}
771
772	#[test]
773	#[should_panic]
774	fn db_config_with_zero_columns() {
775		let _cfg = DatabaseConfig::with_columns(0);
776	}
777
778	#[test]
779	#[should_panic]
780	fn open_db_with_zero_columns() {
781		let cfg = DatabaseConfig { columns: 0, ..Default::default() };
782		let _db = Database::open(&cfg, "");
783	}
784
785	#[test]
786	fn add_columns() {
787		let config_1 = DatabaseConfig::default();
788		let config_5 = DatabaseConfig::with_columns(5);
789
790		let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
791
792		// open 1, add 4.
793		{
794			let mut db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
795			assert_eq!(db.num_columns(), 1);
796
797			for i in 2..=5 {
798				db.add_column().unwrap();
799				assert_eq!(db.num_columns(), i);
800			}
801		}
802
803		// reopen as 5.
804		{
805			let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
806			assert_eq!(db.num_columns(), 5);
807		}
808	}
809
810	#[test]
811	fn remove_columns() {
812		let config_1 = DatabaseConfig::default();
813		let config_5 = DatabaseConfig::with_columns(5);
814
815		let tempdir = TempfileBuilder::new().prefix("drop_columns").tempdir().unwrap();
816
817		// open 5, remove 4.
818		{
819			let mut db = Database::open(&config_5, tempdir.path()).expect("open with 5 columns");
820			assert_eq!(db.num_columns(), 5);
821
822			for i in (1..5).rev() {
823				db.remove_last_column().unwrap();
824				assert_eq!(db.num_columns(), i);
825			}
826		}
827
828		// reopen as 1.
829		{
830			let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
831			assert_eq!(db.num_columns(), 1);
832		}
833	}
834
835	#[test]
836	fn test_num_keys() {
837		let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
838		let config = DatabaseConfig::with_columns(1);
839		let db = Database::open(&config, tempdir.path()).unwrap();
840
841		assert_eq!(db.num_keys(0).unwrap(), 0, "database is empty after creation");
842		let key1 = b"beef";
843		let mut batch = db.transaction();
844		batch.put(0, key1, key1);
845		db.write(batch).unwrap();
846		assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count");
847	}
848
849	#[test]
850	fn default_memory_budget() {
851		let c = DatabaseConfig::default();
852		assert_eq!(c.columns, 1);
853		assert_eq!(c.memory_budget(), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget is default");
854		assert_eq!(
855			c.memory_budget_for_col(0),
856			DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
857			"total memory budget for column 0 is the default"
858		);
859		assert_eq!(
860			c.memory_budget_for_col(999),
861			DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
862			"total memory budget for any column is the default"
863		);
864	}
865
866	#[test]
867	fn memory_budget() {
868		let mut c = DatabaseConfig::with_columns(3);
869		c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().collect();
870		assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
871	}
872
873	#[test]
874	fn test_stats_parser() {
875		let raw = r#"rocksdb.row.cache.hit COUNT : 1
876rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15
877"#;
878		let stats = stats::parse_rocksdb_stats(raw);
879		assert_eq!(stats["row.cache.hit"].count, 1);
880		assert!(stats["row.cache.hit"].times.is_none());
881		assert_eq!(stats["db.get.micros"].count, 0);
882		let get_times = stats["db.get.micros"].times.unwrap();
883		assert_eq!(get_times.sum, 15);
884		assert_eq!(get_times.p50, 2.0);
885		assert_eq!(get_times.p95, 3.0);
886		assert_eq!(get_times.p99, 4.0);
887		assert_eq!(get_times.p100, 5.0);
888	}
889
890	#[test]
891	fn rocksdb_settings() {
892		const NUM_COLS: usize = 2;
893		let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
894		cfg.max_open_files = 123; // is capped by the OS fd limit (typically 1024)
895		cfg.compaction.block_size = 323232;
896		cfg.compaction.initial_file_size = 102030;
897		cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();
898
899		let db_path = TempfileBuilder::new()
900			.prefix("config_test")
901			.tempdir()
902			.expect("the OS can create tmp dirs");
903		let db = Database::open(&cfg, db_path.path()).expect("can open a db");
904		let statistics = db.get_statistics();
905		assert!(statistics.contains_key("block.cache.hit"));
906		drop(db);
907
908		let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap()))
909			.expect("rocksdb creates a LOG file");
910		let mut settings = String::new();
911		rocksdb_log.read_to_string(&mut settings).unwrap();
912		// Check column count
913		assert!(settings.contains("Options for column family [default]"), "no default col");
914		assert!(settings.contains("Options for column family [col0]"), "no col0");
915		assert!(settings.contains("Options for column family [col1]"), "no col1");
916
917		// Check max_open_files
918		assert!(settings.contains("max_open_files: 123"));
919
920		// Check block size
921		assert!(settings.contains(" block_size: 323232"));
922
923		// LRU cache (default column)
924		assert!(settings.contains("block_cache_options:\n    capacity : 115343360"));
925		// LRU cache for non-default columns is ⅓ of memory budget (including default column)
926		let lru_size = (330 * MB) / 3;
927		let needle = format!("block_cache_options:\n    capacity : {}", lru_size);
928		let lru = settings.match_indices(&needle).collect::<Vec<_>>().len();
929		assert_eq!(lru, NUM_COLS);
930
931		// Index/filters share cache
932		let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::<Vec<_>>().len();
933		assert_eq!(include_indexes, NUM_COLS);
934		// Pin index/filters on L0
935		let pins = settings
936			.matches("pin_l0_filter_and_index_blocks_in_cache: 1")
937			.collect::<Vec<_>>()
938			.len();
939		assert_eq!(pins, NUM_COLS);
940
941		// Check target file size, aka initial file size
942		let l0_sizes = settings.matches("target_file_size_base: 102030").collect::<Vec<_>>().len();
943		assert_eq!(l0_sizes, NUM_COLS);
944		// The default column uses the default of 64Mb regardless of the setting.
945		assert!(settings.contains("target_file_size_base: 67108864"));
946
947		// Check compression settings
948		let snappy_compression = settings.matches("Options.compression: Snappy").collect::<Vec<_>>().len();
949		// All columns use Snappy
950		assert_eq!(snappy_compression, NUM_COLS + 1);
951		// …even for L7
952		let snappy_bottommost = settings
953			.matches("Options.bottommost_compression: Disabled")
954			.collect::<Vec<_>>()
955			.len();
956		assert_eq!(snappy_bottommost, NUM_COLS + 1);
957
958		// 7 levels
959		let levels = settings.matches("Options.num_levels: 7").collect::<Vec<_>>().len();
960		assert_eq!(levels, NUM_COLS + 1);
961
962		// Don't fsync every store
963		assert!(settings.contains("Options.use_fsync: 0"));
964
965		// We're using the new format
966		assert!(settings.contains("format_version: 5"));
967	}
968}