kvdb_rocksdb/
lib.rs

1// Copyright 2020 Parity Technologies
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9mod iter;
10mod stats;
11
12use std::{
13	cmp,
14	collections::HashMap,
15	error, io,
16	path::{Path, PathBuf},
17};
18
19use rocksdb::{
20	BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Options, ReadOptions, WriteBatch, WriteOptions, DB,
21};
22
23use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
24
25#[cfg(target_os = "linux")]
26use regex::Regex;
27#[cfg(target_os = "linux")]
28use std::fs::File;
29#[cfg(target_os = "linux")]
30use std::process::Command;
31
32fn other_io_err<E>(e: E) -> io::Error
33where
34	E: Into<Box<dyn error::Error + Send + Sync>>,
35{
36	io::Error::new(io::ErrorKind::Other, e)
37}
38
39fn invalid_column(col: u32) -> io::Error {
40	other_io_err(format!("No such column family: {:?}", col))
41}
42
43// Used for memory budget.
44type MiB = usize;
45
46const KB: usize = 1_024;
47const MB: usize = 1_024 * KB;
48
49/// The default column memory budget in MiB.
50pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
51
52/// The default memory budget in MiB.
53pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;
54
55/// Compaction profile for the database settings
56/// Note, that changing these parameters may trigger
57/// the compaction process of RocksDB on startup.
58/// https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true
59#[derive(Clone, Copy, PartialEq, Debug)]
60pub struct CompactionProfile {
61	/// L0-L1 target file size
62	/// The minimum size should be calculated in accordance with the
63	/// number of levels and the expected size of the database.
64	pub initial_file_size: u64,
65	/// block size
66	pub block_size: usize,
67}
68
69impl Default for CompactionProfile {
70	/// Default profile suitable for most storage
71	fn default() -> CompactionProfile {
72		CompactionProfile::ssd()
73	}
74}
75
76/// Given output of df command return Linux rotational flag file path.
77#[cfg(target_os = "linux")]
78pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
79	use std::str;
80	str::from_utf8(df_out.as_slice())
81		.ok()
82		// Get the drive name.
83		.and_then(|df_str| {
84			Regex::new(r"/dev/(sd[:alpha:]{1,2})")
85				.ok()
86				.and_then(|re| re.captures(df_str))
87				.and_then(|captures| captures.get(1))
88		})
89		// Generate path e.g. /sys/block/sda/queue/rotational
90		.map(|drive_path| {
91			let mut p = PathBuf::from("/sys/block");
92			p.push(drive_path.as_str());
93			p.push("queue/rotational");
94			p
95		})
96}
97
98impl CompactionProfile {
99	/// Attempt to determine the best profile automatically, only Linux for now.
100	#[cfg(target_os = "linux")]
101	pub fn auto<P: AsRef<Path>>(db_path: P) -> CompactionProfile {
102		use std::io::Read;
103		let hdd_check_file = db_path
104			.as_ref()
105			.to_str()
106			.and_then(|path_str| Command::new("df").arg(path_str).output().ok())
107			.and_then(|df_res| if df_res.status.success() { Some(df_res.stdout) } else { None })
108			.and_then(rotational_from_df_output);
109		// Read out the file and match compaction profile.
110		if let Some(hdd_check) = hdd_check_file {
111			if let Ok(mut file) = File::open(hdd_check.as_path()) {
112				let mut buffer = [0; 1];
113				if file.read_exact(&mut buffer).is_ok() {
114					// 0 means not rotational.
115					if buffer == [48] {
116						return Self::ssd()
117					}
118					// 1 means rotational.
119					if buffer == [49] {
120						return Self::hdd()
121					}
122				}
123			}
124		}
125		// Fallback if drive type was not determined.
126		Self::default()
127	}
128
129	/// Just default for other platforms.
130	#[cfg(not(target_os = "linux"))]
131	pub fn auto<P: AsRef<Path>>(_db_path: P) -> CompactionProfile {
132		Self::default()
133	}
134
135	/// Default profile suitable for SSD storage
136	pub fn ssd() -> CompactionProfile {
137		CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB }
138	}
139
140	/// Slow HDD compaction profile
141	pub fn hdd() -> CompactionProfile {
142		CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB }
143	}
144}
145
146/// Database configuration
147#[derive(Clone)]
148#[non_exhaustive]
149pub struct DatabaseConfig {
150	/// Max number of open files.
151	pub max_open_files: i32,
152	/// Memory budget (in MiB) used for setting block cache size and
153	/// write buffer size for each column including the default one.
154	/// If the memory budget of a column is not specified,
155	/// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column.
156	pub memory_budget: HashMap<u32, MiB>,
157	/// Compaction profile.
158	pub compaction: CompactionProfile,
159	/// Set number of columns.
160	///
161	/// # Safety
162	///
163	/// The number of columns must not be zero.
164	pub columns: u32,
165	/// Specify the maximum number of info/debug log files to be kept.
166	pub keep_log_file_num: i32,
167	/// Enable native RocksDB statistics.
168	/// Disabled by default.
169	///
170	/// It can have a negative performance impact up to 10% according to
171	/// https://github.com/facebook/rocksdb/wiki/Statistics.
172	pub enable_statistics: bool,
173	/// Open the database as a secondary instance.
174	/// Specify a path for the secondary instance of the database.
175	/// Secondary instances are read-only and kept updated by tailing the rocksdb MANIFEST.
176	/// It is up to the user to call `catch_up_with_primary()` manually to update the secondary db.
177	/// Disabled by default.
178	///
179	/// `max_open_files` is overridden to always equal `-1`.
180	/// May have a negative performance impact on the secondary instance
181	/// if the secondary instance reads and applies state changes before the primary instance compacts them.
182	/// More info: https://github.com/facebook/rocksdb/wiki/Secondary-instance
183	pub secondary: Option<PathBuf>,
184	/// Limit the size (in bytes) of write ahead logs
185	/// More info: https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log
186	pub max_total_wal_size: Option<u64>,
187	/// Creates a new database if no database exists.
188	/// Set to `true` by default for backwards compatibility.
189	pub create_if_missing: bool,
190}
191
192impl DatabaseConfig {
193	/// Create new `DatabaseConfig` with default parameters and specified set of columns.
194	/// Note that cache sizes must be explicitly set.
195	///
196	/// # Safety
197	///
198	/// The number of `columns` must not be zero.
199	pub fn with_columns(columns: u32) -> Self {
200		assert!(columns > 0, "the number of columns must not be zero");
201
202		Self { columns, ..Default::default() }
203	}
204
205	/// Returns the total memory budget in bytes.
206	pub fn memory_budget(&self) -> MiB {
207		(0..self.columns)
208			.map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
209			.sum()
210	}
211
212	/// Returns the memory budget of the specified column in bytes.
213	fn memory_budget_for_col(&self, col: u32) -> MiB {
214		self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
215	}
216
217	// Get column family configuration with the given block based options.
218	fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options {
219		let column_mem_budget = self.memory_budget_for_col(col);
220		let mut opts = Options::default();
221
222		opts.set_level_compaction_dynamic_level_bytes(true);
223		opts.set_block_based_table_factory(block_opts);
224		opts.optimize_level_style_compaction(column_mem_budget);
225		opts.set_target_file_size_base(self.compaction.initial_file_size);
226		opts.set_compression_per_level(&[]);
227
228		opts
229	}
230}
231
232impl Default for DatabaseConfig {
233	fn default() -> DatabaseConfig {
234		DatabaseConfig {
235			max_open_files: 512,
236			memory_budget: HashMap::new(),
237			compaction: CompactionProfile::default(),
238			columns: 1,
239			keep_log_file_num: 1,
240			enable_statistics: false,
241			secondary: None,
242			max_total_wal_size: None,
243			create_if_missing: true,
244		}
245	}
246}
247
248struct DBAndColumns {
249	db: DB,
250	column_names: Vec<String>,
251}
252
253impl DBAndColumns {
254	fn cf(&self, i: usize) -> io::Result<&ColumnFamily> {
255		let name = self.column_names.get(i).ok_or_else(|| invalid_column(i as u32))?;
256		self.db
257			.cf_handle(&name)
258			.ok_or_else(|| other_io_err(format!("invalid column name: {name}")))
259	}
260}
261
262/// Key-Value database.
263pub struct Database {
264	inner: DBAndColumns,
265	config: DatabaseConfig,
266	opts: Options,
267	write_opts: WriteOptions,
268	read_opts: ReadOptions,
269	block_opts: BlockBasedOptions,
270	stats: stats::RunningDbStats,
271}
272
273/// Generate the options for RocksDB, based on the given `DatabaseConfig`.
274fn generate_options(config: &DatabaseConfig) -> Options {
275	let mut opts = Options::default();
276
277	opts.set_report_bg_io_stats(true);
278	if config.enable_statistics {
279		opts.enable_statistics();
280	}
281	opts.set_use_fsync(false);
282	opts.create_if_missing(config.create_if_missing);
283	if config.secondary.is_some() {
284		opts.set_max_open_files(-1)
285	} else {
286		opts.set_max_open_files(config.max_open_files);
287	}
288	opts.set_bytes_per_sync(1 * MB as u64);
289	opts.set_keep_log_file_num(1);
290	opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
291	if let Some(m) = config.max_total_wal_size {
292		opts.set_max_total_wal_size(m);
293	}
294
295	opts
296}
297
298fn generate_read_options() -> ReadOptions {
299	let mut read_opts = ReadOptions::default();
300	read_opts.set_verify_checksums(false);
301	read_opts
302}
303
304/// Generate the block based options for RocksDB, based on the given `DatabaseConfig`.
305fn generate_block_based_options(config: &DatabaseConfig) -> io::Result<BlockBasedOptions> {
306	let mut block_opts = BlockBasedOptions::default();
307	block_opts.set_block_size(config.compaction.block_size);
308	// See https://github.com/facebook/rocksdb/blob/a1523efcdf2f0e8133b9a9f6e170a0dad49f928f/include/rocksdb/table.h#L246-L271 for details on what the format versions are/do.
309	block_opts.set_format_version(5);
310	block_opts.set_block_restart_interval(16);
311	// Set cache size as recommended by
312	// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#block-cache-size
313	let cache_size = config.memory_budget() / 3;
314	if cache_size == 0 {
315		block_opts.disable_cache()
316	} else {
317		let cache = rocksdb::Cache::new_lru_cache(cache_size);
318		block_opts.set_block_cache(&cache);
319		// "index and filter blocks will be stored in block cache, together with all other data blocks."
320		// See: https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
321		block_opts.set_cache_index_and_filter_blocks(true);
322		// Don't evict L0 filter/index blocks from the cache
323		block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
324	}
325	block_opts.set_bloom_filter(10.0, true);
326
327	Ok(block_opts)
328}
329
330impl Database {
331	/// Open database file.
332	///
333	/// # Safety
334	///
335	/// The number of `config.columns` must not be zero.
336	pub fn open<P: AsRef<Path>>(config: &DatabaseConfig, path: P) -> io::Result<Database> {
337		assert!(config.columns > 0, "the number of columns must not be zero");
338
339		let opts = generate_options(config);
340		let block_opts = generate_block_based_options(config)?;
341
342		let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
343		let write_opts = WriteOptions::default();
344		let read_opts = generate_read_options();
345
346		let db = if let Some(secondary_path) = &config.secondary {
347			Self::open_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names.as_slice())?
348		} else {
349			let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
350			Self::open_primary(&opts, path.as_ref(), config, column_names.as_slice(), &block_opts)?
351		};
352
353		Ok(Database {
354			inner: DBAndColumns { db, column_names },
355			config: config.clone(),
356			opts,
357			read_opts,
358			write_opts,
359			block_opts,
360			stats: stats::RunningDbStats::new(),
361		})
362	}
363
364	/// Internal api to open a database in primary mode.
365	fn open_primary<P: AsRef<Path>>(
366		opts: &Options,
367		path: P,
368		config: &DatabaseConfig,
369		column_names: &[&str],
370		block_opts: &BlockBasedOptions,
371	) -> io::Result<rocksdb::DB> {
372		let cf_descriptors: Vec<_> = (0..config.columns)
373			.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
374			.collect();
375
376		let db = match DB::open_cf_descriptors(&opts, path.as_ref(), cf_descriptors) {
377			Err(_) => {
378				// retry and create CFs
379				match DB::open_cf(&opts, path.as_ref(), &[] as &[&str]) {
380					Ok(mut db) => {
381						for (i, name) in column_names.iter().enumerate() {
382							let _ = db
383								.create_cf(name, &config.column_config(&block_opts, i as u32))
384								.map_err(other_io_err)?;
385						}
386						Ok(db)
387					},
388					err => err,
389				}
390			},
391			ok => ok,
392		};
393
394		Ok(match db {
395			Ok(db) => db,
396			Err(s) => return Err(other_io_err(s)),
397		})
398	}
399
400	/// Internal api to open a database in secondary mode.
401	/// Secondary database needs a seperate path to store its own logs.
402	fn open_secondary<P: AsRef<Path>>(
403		opts: &Options,
404		path: P,
405		secondary_path: P,
406		column_names: &[String],
407	) -> io::Result<rocksdb::DB> {
408		let db = DB::open_cf_as_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names);
409
410		Ok(match db {
411			Ok(db) => db,
412			Err(s) => return Err(other_io_err(s)),
413		})
414	}
415
416	/// Helper to create new transaction for this database.
417	pub fn transaction(&self) -> DBTransaction {
418		DBTransaction::new()
419	}
420
421	/// Commit transaction to database.
422	pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
423		let cfs = &self.inner;
424		let mut batch = WriteBatch::default();
425		let ops = tr.ops;
426
427		self.stats.tally_writes(ops.len() as u64);
428		self.stats.tally_transactions(1);
429
430		let mut stats_total_bytes = 0;
431
432		for op in ops {
433			let col = op.col();
434			let cf = cfs.cf(col as usize)?;
435
436			match op {
437				DBOp::Insert { col: _, key, value } => {
438					stats_total_bytes += key.len() + value.len();
439					batch.put_cf(cf, &key, &value);
440				},
441				DBOp::Delete { col: _, key } => {
442					// We count deletes as writes.
443					stats_total_bytes += key.len();
444					batch.delete_cf(cf, &key);
445				},
446				DBOp::DeletePrefix { col, prefix } => {
447					let end_prefix = kvdb::end_prefix(&prefix[..]);
448					let no_end = end_prefix.is_none();
449					let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
450					batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
451					if no_end {
452						let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
453						for result in self.iter_with_prefix(col, prefix) {
454							let (key, _) = result?;
455							batch.delete_cf(cf, &key[..]);
456						}
457					}
458				},
459			};
460		}
461		self.stats.tally_bytes_written(stats_total_bytes as u64);
462
463		cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err)
464	}
465
466	/// Get value by key.
467	pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
468		let cfs = &self.inner;
469		let cf = cfs.cf(col as usize)?;
470		self.stats.tally_reads(1);
471		let value = cfs
472			.db
473			.get_pinned_cf_opt(cf, key, &self.read_opts)
474			.map(|r| r.map(|v| v.to_vec()))
475			.map_err(other_io_err);
476
477		match value {
478			Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
479			Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
480			_ => {},
481		};
482
483		value
484	}
485
486	/// Get value by partial key. Prefix size should match configured prefix size.
487	pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
488		self.iter_with_prefix(col, prefix)
489			.next()
490			.transpose()
491			.map(|m| m.map(|(_k, v)| v))
492	}
493
494	/// Iterator over the data in the given database column index.
495	/// Will hold a lock until the iterator is dropped
496	/// preventing the database from being closed.
497	pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
498		let read_opts = generate_read_options();
499		iter::IterationHandler::iter(&self.inner, col, read_opts)
500	}
501
502	/// Iterator over data in the `col` database column index matching the given prefix.
503	/// Will hold a lock until the iterator is dropped
504	/// preventing the database from being closed.
505	fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
506		let mut read_opts = generate_read_options();
507		// rocksdb doesn't work with an empty upper bound
508		if let Some(end_prefix) = kvdb::end_prefix(prefix) {
509			read_opts.set_iterate_upper_bound(end_prefix);
510		}
511		iter::IterationHandler::iter_with_prefix(&self.inner, col, prefix, read_opts)
512	}
513
514	/// The number of column families in the db.
515	pub fn num_columns(&self) -> u32 {
516		self.inner.column_names.len() as u32
517	}
518
519	/// The number of keys in a column (estimated).
520	pub fn num_keys(&self, col: u32) -> io::Result<u64> {
521		const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
522		let cfs = &self.inner;
523		let cf = cfs.cf(col as usize)?;
524		match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
525			Ok(estimate) => Ok(estimate.unwrap_or_default()),
526			Err(err_string) => Err(other_io_err(err_string)),
527		}
528	}
529
530	/// Remove the last column family in the database. The deletion is definitive.
531	pub fn remove_last_column(&mut self) -> io::Result<()> {
532		let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
533		if let Some(name) = column_names.pop() {
534			db.drop_cf(&name).map_err(other_io_err)?;
535		}
536		Ok(())
537	}
538
539	/// Add a new column family to the DB.
540	pub fn add_column(&mut self) -> io::Result<()> {
541		let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
542		let col = column_names.len() as u32;
543		let name = format!("col{}", col);
544		let col_config = self.config.column_config(&self.block_opts, col as u32);
545		let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?;
546		column_names.push(name);
547		Ok(())
548	}
549
550	/// Get RocksDB statistics.
551	pub fn get_statistics(&self) -> HashMap<String, stats::RocksDbStatsValue> {
552		if let Some(stats) = self.opts.get_statistics() {
553			stats::parse_rocksdb_stats(&stats)
554		} else {
555			HashMap::new()
556		}
557	}
558
559	/// Try to catch up a secondary instance with
560	/// the primary by reading as much from the logs as possible.
561	///
562	/// Guaranteed to have changes up to the the time that `try_catch_up_with_primary` is called
563	/// if it finishes succesfully.
564	///
565	/// Blocks until the MANIFEST file and any state changes in the corresponding Write-Ahead-Logs
566	/// are applied to the secondary instance. If the manifest files are very large
567	/// this method could take a long time.
568	///
569	/// If Write-Ahead-Logs have been purged by the primary instance before the secondary
570	/// is able to open them, the secondary will not be caught up
571	/// until this function is called again and new Write-Ahead-Logs are identified.
572	///
573	/// If called while the primary is writing, the catch-up may fail.
574	///
575	/// If the secondary is unable to catch up because of missing logs,
576	/// this method fails silently and no error is returned.
577	///
578	/// Calling this as primary will return an error.
579	pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
580		self.inner.db.try_catch_up_with_primary().map_err(other_io_err)
581	}
582}
583
584// duplicate declaration of methods here to avoid trait import in certain existing cases
585// at time of addition.
586impl KeyValueDB for Database {
587	fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
588		Database::get(self, col, key)
589	}
590
591	fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
592		Database::get_by_prefix(self, col, prefix)
593	}
594
595	fn write(&self, transaction: DBTransaction) -> io::Result<()> {
596		Database::write(self, transaction)
597	}
598
599	fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
600		let unboxed = Database::iter(self, col);
601		Box::new(unboxed.into_iter())
602	}
603
604	fn iter_with_prefix<'a>(
605		&'a self,
606		col: u32,
607		prefix: &'a [u8],
608	) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
609		let unboxed = Database::iter_with_prefix(self, col, prefix);
610		Box::new(unboxed.into_iter())
611	}
612
613	fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats {
614		let rocksdb_stats = self.get_statistics();
615		let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64);
616		let overall_stats = self.stats.overall();
617		let old_cache_hit_count = overall_stats.raw.cache_hit_count;
618
619		self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count);
620
621		let taken_stats = match kind {
622			kvdb::IoStatsKind::Overall => self.stats.overall(),
623			kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
624		};
625
626		let mut stats = kvdb::IoStats::empty();
627
628		stats.reads = taken_stats.raw.reads;
629		stats.writes = taken_stats.raw.writes;
630		stats.transactions = taken_stats.raw.transactions;
631		stats.bytes_written = taken_stats.raw.bytes_written;
632		stats.bytes_read = taken_stats.raw.bytes_read;
633		stats.cache_reads = taken_stats.raw.cache_hit_count;
634		stats.started = taken_stats.started;
635		stats.span = taken_stats.started.elapsed();
636
637		stats
638	}
639}
640
641#[cfg(test)]
642mod tests {
643	use super::*;
644	use kvdb_shared_tests as st;
645	use std::io::{self, Read};
646	use tempfile::Builder as TempfileBuilder;
647
648	fn create(columns: u32) -> io::Result<Database> {
649		let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
650		let config = DatabaseConfig::with_columns(columns);
651		Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))
652	}
653
654	#[test]
655	fn get_fails_with_non_existing_column() -> io::Result<()> {
656		let db = create(1)?;
657		st::test_get_fails_with_non_existing_column(&db)
658	}
659
660	#[test]
661	fn put_and_get() -> io::Result<()> {
662		let db = create(1)?;
663		st::test_put_and_get(&db)
664	}
665
666	#[test]
667	fn delete_and_get() -> io::Result<()> {
668		let db = create(1)?;
669		st::test_delete_and_get(&db)
670	}
671
672	#[test]
673	fn delete_prefix() -> io::Result<()> {
674		let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
675		st::test_delete_prefix(&db)
676	}
677
678	#[test]
679	fn iter() -> io::Result<()> {
680		let db = create(1)?;
681		st::test_iter(&db)
682	}
683
684	#[test]
685	fn iter_with_prefix() -> io::Result<()> {
686		let db = create(1)?;
687		st::test_iter_with_prefix(&db)
688	}
689
690	#[test]
691	fn complex() -> io::Result<()> {
692		let db = create(1)?;
693		st::test_complex(&db)
694	}
695
696	#[test]
697	fn stats() -> io::Result<()> {
698		let db = create(st::IO_STATS_NUM_COLUMNS)?;
699		st::test_io_stats(&db)
700	}
701
702	#[test]
703	fn secondary_db_get() -> io::Result<()> {
704		let primary = TempfileBuilder::new().prefix("").tempdir()?;
705		let secondary = TempfileBuilder::new().prefix("").tempdir()?;
706		let config = DatabaseConfig::with_columns(1);
707		let db = Database::open(&config, primary.path()).unwrap();
708
709		let key1 = b"key1";
710		let mut transaction = db.transaction();
711		transaction.put(0, key1, b"horse");
712		db.write(transaction)?;
713
714		let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
715		let second_db = Database::open(&config, primary.path()).unwrap();
716		assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
717		Ok(())
718	}
719
720	#[test]
721	fn secondary_db_catch_up() -> io::Result<()> {
722		let primary = TempfileBuilder::new().prefix("").tempdir()?;
723		let secondary = TempfileBuilder::new().prefix("").tempdir()?;
724		let config = DatabaseConfig::with_columns(1);
725		let db = Database::open(&config, primary.path()).unwrap();
726
727		let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
728		let second_db = Database::open(&config, primary.path()).unwrap();
729
730		let mut transaction = db.transaction();
731		transaction.put(0, b"key1", b"mule");
732		transaction.put(0, b"key2", b"cat");
733		db.write(transaction)?;
734
735		second_db.try_catch_up_with_primary()?;
736		assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
737		Ok(())
738	}
739
740	#[test]
741	#[cfg(target_os = "linux")]
742	fn df_to_rotational() {
743		use std::path::PathBuf;
744		// Example df output.
745		let example_df = vec![
746			70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107,
747			115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101,
748			37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32,
749			32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57,
750			52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10,
751		];
752		let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
753		assert_eq!(rotational_from_df_output(example_df), expected_output);
754	}
755
756	#[test]
757	#[should_panic]
758	fn db_config_with_zero_columns() {
759		let _cfg = DatabaseConfig::with_columns(0);
760	}
761
762	#[test]
763	#[should_panic]
764	fn open_db_with_zero_columns() {
765		let cfg = DatabaseConfig { columns: 0, ..Default::default() };
766		let _db = Database::open(&cfg, "");
767	}
768
769	#[test]
770	fn add_columns() {
771		let config_1 = DatabaseConfig::default();
772		let config_5 = DatabaseConfig::with_columns(5);
773
774		let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
775
776		// open 1, add 4.
777		{
778			let mut db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
779			assert_eq!(db.num_columns(), 1);
780
781			for i in 2..=5 {
782				db.add_column().unwrap();
783				assert_eq!(db.num_columns(), i);
784			}
785		}
786
787		// reopen as 5.
788		{
789			let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
790			assert_eq!(db.num_columns(), 5);
791		}
792	}
793
794	#[test]
795	fn remove_columns() {
796		let config_1 = DatabaseConfig::default();
797		let config_5 = DatabaseConfig::with_columns(5);
798
799		let tempdir = TempfileBuilder::new().prefix("drop_columns").tempdir().unwrap();
800
801		// open 5, remove 4.
802		{
803			let mut db = Database::open(&config_5, tempdir.path()).expect("open with 5 columns");
804			assert_eq!(db.num_columns(), 5);
805
806			for i in (1..5).rev() {
807				db.remove_last_column().unwrap();
808				assert_eq!(db.num_columns(), i);
809			}
810		}
811
812		// reopen as 1.
813		{
814			let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
815			assert_eq!(db.num_columns(), 1);
816		}
817	}
818
819	#[test]
820	fn test_num_keys() {
821		let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
822		let config = DatabaseConfig::with_columns(1);
823		let db = Database::open(&config, tempdir.path()).unwrap();
824
825		assert_eq!(db.num_keys(0).unwrap(), 0, "database is empty after creation");
826		let key1 = b"beef";
827		let mut batch = db.transaction();
828		batch.put(0, key1, key1);
829		db.write(batch).unwrap();
830		assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count");
831	}
832
833	#[test]
834	fn default_memory_budget() {
835		let c = DatabaseConfig::default();
836		assert_eq!(c.columns, 1);
837		assert_eq!(c.memory_budget(), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget is default");
838		assert_eq!(
839			c.memory_budget_for_col(0),
840			DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
841			"total memory budget for column 0 is the default"
842		);
843		assert_eq!(
844			c.memory_budget_for_col(999),
845			DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
846			"total memory budget for any column is the default"
847		);
848	}
849
850	#[test]
851	fn memory_budget() {
852		let mut c = DatabaseConfig::with_columns(3);
853		c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().collect();
854		assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
855	}
856
857	#[test]
858	fn test_stats_parser() {
859		let raw = r#"rocksdb.row.cache.hit COUNT : 1
860rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15
861"#;
862		let stats = stats::parse_rocksdb_stats(raw);
863		assert_eq!(stats["row.cache.hit"].count, 1);
864		assert!(stats["row.cache.hit"].times.is_none());
865		assert_eq!(stats["db.get.micros"].count, 0);
866		let get_times = stats["db.get.micros"].times.unwrap();
867		assert_eq!(get_times.sum, 15);
868		assert_eq!(get_times.p50, 2.0);
869		assert_eq!(get_times.p95, 3.0);
870		assert_eq!(get_times.p99, 4.0);
871		assert_eq!(get_times.p100, 5.0);
872	}
873
874	#[test]
875	fn rocksdb_settings() {
876		const NUM_COLS: usize = 2;
877		let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
878		cfg.max_open_files = 123; // is capped by the OS fd limit (typically 1024)
879		cfg.compaction.block_size = 323232;
880		cfg.compaction.initial_file_size = 102030;
881		cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();
882
883		let db_path = TempfileBuilder::new()
884			.prefix("config_test")
885			.tempdir()
886			.expect("the OS can create tmp dirs");
887		let db = Database::open(&cfg, db_path.path()).expect("can open a db");
888		let statistics = db.get_statistics();
889		assert!(statistics.contains_key("block.cache.hit"));
890		drop(db);
891
892		let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap()))
893			.expect("rocksdb creates a LOG file");
894		let mut settings = String::new();
895		rocksdb_log.read_to_string(&mut settings).unwrap();
896		// Check column count
897		assert!(settings.contains("Options for column family [default]"), "no default col");
898		assert!(settings.contains("Options for column family [col0]"), "no col0");
899		assert!(settings.contains("Options for column family [col1]"), "no col1");
900
901		// Check max_open_files
902		assert!(settings.contains("max_open_files: 123"));
903
904		// Check block size
905		assert!(settings.contains(" block_size: 323232"));
906
907		// LRU cache (default column)
908		assert!(settings.contains("block_cache_options:\n    capacity : 8388608"));
909		// LRU cache for non-default columns is ⅓ of memory budget (including default column)
910		let lru_size = (330 * MB) / 3;
911		let needle = format!("block_cache_options:\n    capacity : {}", lru_size);
912		let lru = settings.match_indices(&needle).collect::<Vec<_>>().len();
913		assert_eq!(lru, NUM_COLS);
914
915		// Index/filters share cache
916		let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::<Vec<_>>().len();
917		assert_eq!(include_indexes, NUM_COLS);
918		// Pin index/filters on L0
919		let pins = settings
920			.matches("pin_l0_filter_and_index_blocks_in_cache: 1")
921			.collect::<Vec<_>>()
922			.len();
923		assert_eq!(pins, NUM_COLS);
924
925		// Check target file size, aka initial file size
926		let l0_sizes = settings.matches("target_file_size_base: 102030").collect::<Vec<_>>().len();
927		assert_eq!(l0_sizes, NUM_COLS);
928		// The default column uses the default of 64Mb regardless of the setting.
929		assert!(settings.contains("target_file_size_base: 67108864"));
930
931		// Check compression settings
932		let snappy_compression = settings.matches("Options.compression: Snappy").collect::<Vec<_>>().len();
933		// All columns use Snappy
934		assert_eq!(snappy_compression, NUM_COLS + 1);
935		// …even for L7
936		let snappy_bottommost = settings
937			.matches("Options.bottommost_compression: Disabled")
938			.collect::<Vec<_>>()
939			.len();
940		assert_eq!(snappy_bottommost, NUM_COLS + 1);
941
942		// 7 levels
943		let levels = settings.matches("Options.num_levels: 7").collect::<Vec<_>>().len();
944		assert_eq!(levels, NUM_COLS + 1);
945
946		// Don't fsync every store
947		assert!(settings.contains("Options.use_fsync: 0"));
948
949		// We're using the new format
950		assert!(settings.contains("format_version: 5"));
951	}
952}