1mod iter;
10mod stats;
11
12use std::{
13 cmp,
14 collections::HashMap,
15 error, io,
16 path::{Path, PathBuf},
17};
18
19use rocksdb::{
20 BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Options, ReadOptions, WriteBatch, WriteOptions, DB,
21};
22
23use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
24
25#[cfg(target_os = "linux")]
26use regex::Regex;
27#[cfg(target_os = "linux")]
28use std::fs::File;
29#[cfg(target_os = "linux")]
30use std::process::Command;
31
32fn other_io_err<E>(e: E) -> io::Error
33where
34 E: Into<Box<dyn error::Error + Send + Sync>>,
35{
36 io::Error::new(io::ErrorKind::Other, e)
37}
38
39fn invalid_column(col: u32) -> io::Error {
40 other_io_err(format!("No such column family: {:?}", col))
41}
42
43type MiB = usize;
45
46const KB: usize = 1_024;
47const MB: usize = 1_024 * KB;
48
49pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
51
52pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;
54
55#[derive(Clone, Copy, PartialEq, Debug)]
60pub struct CompactionProfile {
61 pub initial_file_size: u64,
65 pub block_size: usize,
67}
68
69impl Default for CompactionProfile {
70 fn default() -> CompactionProfile {
72 CompactionProfile::ssd()
73 }
74}
75
76#[cfg(target_os = "linux")]
78pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
79 use std::str;
80 str::from_utf8(df_out.as_slice())
81 .ok()
82 .and_then(|df_str| {
84 Regex::new(r"/dev/(sd[:alpha:]{1,2})")
85 .ok()
86 .and_then(|re| re.captures(df_str))
87 .and_then(|captures| captures.get(1))
88 })
89 .map(|drive_path| {
91 let mut p = PathBuf::from("/sys/block");
92 p.push(drive_path.as_str());
93 p.push("queue/rotational");
94 p
95 })
96}
97
98impl CompactionProfile {
99 #[cfg(target_os = "linux")]
101 pub fn auto<P: AsRef<Path>>(db_path: P) -> CompactionProfile {
102 use std::io::Read;
103 let hdd_check_file = db_path
104 .as_ref()
105 .to_str()
106 .and_then(|path_str| Command::new("df").arg(path_str).output().ok())
107 .and_then(|df_res| if df_res.status.success() { Some(df_res.stdout) } else { None })
108 .and_then(rotational_from_df_output);
109 if let Some(hdd_check) = hdd_check_file {
111 if let Ok(mut file) = File::open(hdd_check.as_path()) {
112 let mut buffer = [0; 1];
113 if file.read_exact(&mut buffer).is_ok() {
114 if buffer == [48] {
116 return Self::ssd()
117 }
118 if buffer == [49] {
120 return Self::hdd()
121 }
122 }
123 }
124 }
125 Self::default()
127 }
128
129 #[cfg(not(target_os = "linux"))]
131 pub fn auto<P: AsRef<Path>>(_db_path: P) -> CompactionProfile {
132 Self::default()
133 }
134
135 pub fn ssd() -> CompactionProfile {
137 CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB }
138 }
139
140 pub fn hdd() -> CompactionProfile {
142 CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB }
143 }
144}
145
146#[derive(Clone)]
148#[non_exhaustive]
149pub struct DatabaseConfig {
150 pub max_open_files: i32,
152 pub memory_budget: HashMap<u32, MiB>,
157 pub compaction: CompactionProfile,
159 pub columns: u32,
165 pub keep_log_file_num: i32,
167 pub enable_statistics: bool,
173 pub secondary: Option<PathBuf>,
184 pub max_total_wal_size: Option<u64>,
187 pub create_if_missing: bool,
190}
191
192impl DatabaseConfig {
193 pub fn with_columns(columns: u32) -> Self {
200 assert!(columns > 0, "the number of columns must not be zero");
201
202 Self { columns, ..Default::default() }
203 }
204
205 pub fn memory_budget(&self) -> MiB {
207 (0..self.columns)
208 .map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
209 .sum()
210 }
211
212 fn memory_budget_for_col(&self, col: u32) -> MiB {
214 self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
215 }
216
217 fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options {
219 let column_mem_budget = self.memory_budget_for_col(col);
220 let mut opts = Options::default();
221
222 opts.set_level_compaction_dynamic_level_bytes(true);
223 opts.set_block_based_table_factory(block_opts);
224 opts.optimize_level_style_compaction(column_mem_budget);
225 opts.set_target_file_size_base(self.compaction.initial_file_size);
226 opts.set_compression_per_level(&[]);
227
228 opts
229 }
230}
231
232impl Default for DatabaseConfig {
233 fn default() -> DatabaseConfig {
234 DatabaseConfig {
235 max_open_files: 512,
236 memory_budget: HashMap::new(),
237 compaction: CompactionProfile::default(),
238 columns: 1,
239 keep_log_file_num: 1,
240 enable_statistics: false,
241 secondary: None,
242 max_total_wal_size: None,
243 create_if_missing: true,
244 }
245 }
246}
247
248struct DBAndColumns {
249 db: DB,
250 column_names: Vec<String>,
251}
252
253impl DBAndColumns {
254 fn cf(&self, i: usize) -> io::Result<&ColumnFamily> {
255 let name = self.column_names.get(i).ok_or_else(|| invalid_column(i as u32))?;
256 self.db
257 .cf_handle(&name)
258 .ok_or_else(|| other_io_err(format!("invalid column name: {name}")))
259 }
260}
261
262pub struct Database {
264 inner: DBAndColumns,
265 config: DatabaseConfig,
266 opts: Options,
267 write_opts: WriteOptions,
268 read_opts: ReadOptions,
269 block_opts: BlockBasedOptions,
270 stats: stats::RunningDbStats,
271}
272
273fn generate_options(config: &DatabaseConfig) -> Options {
275 let mut opts = Options::default();
276
277 opts.set_report_bg_io_stats(true);
278 if config.enable_statistics {
279 opts.enable_statistics();
280 }
281 opts.set_use_fsync(false);
282 opts.create_if_missing(config.create_if_missing);
283 if config.secondary.is_some() {
284 opts.set_max_open_files(-1)
285 } else {
286 opts.set_max_open_files(config.max_open_files);
287 }
288 opts.set_bytes_per_sync(1 * MB as u64);
289 opts.set_keep_log_file_num(1);
290 opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
291 if let Some(m) = config.max_total_wal_size {
292 opts.set_max_total_wal_size(m);
293 }
294
295 opts
296}
297
298fn generate_read_options() -> ReadOptions {
299 let mut read_opts = ReadOptions::default();
300 read_opts.set_verify_checksums(false);
301 read_opts
302}
303
304fn generate_block_based_options(config: &DatabaseConfig) -> io::Result<BlockBasedOptions> {
306 let mut block_opts = BlockBasedOptions::default();
307 block_opts.set_block_size(config.compaction.block_size);
308 block_opts.set_format_version(5);
310 block_opts.set_block_restart_interval(16);
311 let cache_size = config.memory_budget() / 3;
314 if cache_size == 0 {
315 block_opts.disable_cache()
316 } else {
317 let cache = rocksdb::Cache::new_lru_cache(cache_size);
318 block_opts.set_block_cache(&cache);
319 block_opts.set_cache_index_and_filter_blocks(true);
322 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
324 }
325 block_opts.set_bloom_filter(10.0, true);
326
327 Ok(block_opts)
328}
329
330impl Database {
331 pub fn open<P: AsRef<Path>>(config: &DatabaseConfig, path: P) -> io::Result<Database> {
337 assert!(config.columns > 0, "the number of columns must not be zero");
338
339 let opts = generate_options(config);
340 let block_opts = generate_block_based_options(config)?;
341
342 let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
343 let write_opts = WriteOptions::default();
344 let read_opts = generate_read_options();
345
346 let db = if let Some(secondary_path) = &config.secondary {
347 Self::open_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names.as_slice())?
348 } else {
349 let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
350 Self::open_primary(&opts, path.as_ref(), config, column_names.as_slice(), &block_opts)?
351 };
352
353 Ok(Database {
354 inner: DBAndColumns { db, column_names },
355 config: config.clone(),
356 opts,
357 read_opts,
358 write_opts,
359 block_opts,
360 stats: stats::RunningDbStats::new(),
361 })
362 }
363
364 fn open_primary<P: AsRef<Path>>(
366 opts: &Options,
367 path: P,
368 config: &DatabaseConfig,
369 column_names: &[&str],
370 block_opts: &BlockBasedOptions,
371 ) -> io::Result<rocksdb::DB> {
372 let cf_descriptors: Vec<_> = (0..config.columns)
373 .map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
374 .collect();
375
376 let db = match DB::open_cf_descriptors(&opts, path.as_ref(), cf_descriptors) {
377 Err(_) => {
378 match DB::open_cf(&opts, path.as_ref(), &[] as &[&str]) {
380 Ok(mut db) => {
381 for (i, name) in column_names.iter().enumerate() {
382 let _ = db
383 .create_cf(name, &config.column_config(&block_opts, i as u32))
384 .map_err(other_io_err)?;
385 }
386 Ok(db)
387 },
388 err => err,
389 }
390 },
391 ok => ok,
392 };
393
394 Ok(match db {
395 Ok(db) => db,
396 Err(s) => return Err(other_io_err(s)),
397 })
398 }
399
400 fn open_secondary<P: AsRef<Path>>(
403 opts: &Options,
404 path: P,
405 secondary_path: P,
406 column_names: &[String],
407 ) -> io::Result<rocksdb::DB> {
408 let db = DB::open_cf_as_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names);
409
410 Ok(match db {
411 Ok(db) => db,
412 Err(s) => return Err(other_io_err(s)),
413 })
414 }
415
416 pub fn transaction(&self) -> DBTransaction {
418 DBTransaction::new()
419 }
420
421 pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
423 let cfs = &self.inner;
424 let mut batch = WriteBatch::default();
425 let ops = tr.ops;
426
427 self.stats.tally_writes(ops.len() as u64);
428 self.stats.tally_transactions(1);
429
430 let mut stats_total_bytes = 0;
431
432 for op in ops {
433 let col = op.col();
434 let cf = cfs.cf(col as usize)?;
435
436 match op {
437 DBOp::Insert { col: _, key, value } => {
438 stats_total_bytes += key.len() + value.len();
439 batch.put_cf(cf, &key, &value);
440 },
441 DBOp::Delete { col: _, key } => {
442 stats_total_bytes += key.len();
444 batch.delete_cf(cf, &key);
445 },
446 DBOp::DeletePrefix { col, prefix } => {
447 let end_prefix = kvdb::end_prefix(&prefix[..]);
448 let no_end = end_prefix.is_none();
449 let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
450 batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
451 if no_end {
452 let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
453 for result in self.iter_with_prefix(col, prefix) {
454 let (key, _) = result?;
455 batch.delete_cf(cf, &key[..]);
456 }
457 }
458 },
459 };
460 }
461 self.stats.tally_bytes_written(stats_total_bytes as u64);
462
463 cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err)
464 }
465
466 pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
468 let cfs = &self.inner;
469 let cf = cfs.cf(col as usize)?;
470 self.stats.tally_reads(1);
471 let value = cfs
472 .db
473 .get_pinned_cf_opt(cf, key, &self.read_opts)
474 .map(|r| r.map(|v| v.to_vec()))
475 .map_err(other_io_err);
476
477 match value {
478 Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
479 Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
480 _ => {},
481 };
482
483 value
484 }
485
486 pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
488 self.iter_with_prefix(col, prefix)
489 .next()
490 .transpose()
491 .map(|m| m.map(|(_k, v)| v))
492 }
493
494 pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
498 let read_opts = generate_read_options();
499 iter::IterationHandler::iter(&self.inner, col, read_opts)
500 }
501
502 fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
506 let mut read_opts = generate_read_options();
507 if let Some(end_prefix) = kvdb::end_prefix(prefix) {
509 read_opts.set_iterate_upper_bound(end_prefix);
510 }
511 iter::IterationHandler::iter_with_prefix(&self.inner, col, prefix, read_opts)
512 }
513
514 pub fn num_columns(&self) -> u32 {
516 self.inner.column_names.len() as u32
517 }
518
519 pub fn num_keys(&self, col: u32) -> io::Result<u64> {
521 const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
522 let cfs = &self.inner;
523 let cf = cfs.cf(col as usize)?;
524 match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
525 Ok(estimate) => Ok(estimate.unwrap_or_default()),
526 Err(err_string) => Err(other_io_err(err_string)),
527 }
528 }
529
530 pub fn remove_last_column(&mut self) -> io::Result<()> {
532 let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
533 if let Some(name) = column_names.pop() {
534 db.drop_cf(&name).map_err(other_io_err)?;
535 }
536 Ok(())
537 }
538
539 pub fn add_column(&mut self) -> io::Result<()> {
541 let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
542 let col = column_names.len() as u32;
543 let name = format!("col{}", col);
544 let col_config = self.config.column_config(&self.block_opts, col as u32);
545 let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?;
546 column_names.push(name);
547 Ok(())
548 }
549
550 pub fn get_statistics(&self) -> HashMap<String, stats::RocksDbStatsValue> {
552 if let Some(stats) = self.opts.get_statistics() {
553 stats::parse_rocksdb_stats(&stats)
554 } else {
555 HashMap::new()
556 }
557 }
558
559 pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
580 self.inner.db.try_catch_up_with_primary().map_err(other_io_err)
581 }
582}
583
584impl KeyValueDB for Database {
587 fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
588 Database::get(self, col, key)
589 }
590
591 fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
592 Database::get_by_prefix(self, col, prefix)
593 }
594
595 fn write(&self, transaction: DBTransaction) -> io::Result<()> {
596 Database::write(self, transaction)
597 }
598
599 fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
600 let unboxed = Database::iter(self, col);
601 Box::new(unboxed.into_iter())
602 }
603
604 fn iter_with_prefix<'a>(
605 &'a self,
606 col: u32,
607 prefix: &'a [u8],
608 ) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
609 let unboxed = Database::iter_with_prefix(self, col, prefix);
610 Box::new(unboxed.into_iter())
611 }
612
613 fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats {
614 let rocksdb_stats = self.get_statistics();
615 let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64);
616 let overall_stats = self.stats.overall();
617 let old_cache_hit_count = overall_stats.raw.cache_hit_count;
618
619 self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count);
620
621 let taken_stats = match kind {
622 kvdb::IoStatsKind::Overall => self.stats.overall(),
623 kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
624 };
625
626 let mut stats = kvdb::IoStats::empty();
627
628 stats.reads = taken_stats.raw.reads;
629 stats.writes = taken_stats.raw.writes;
630 stats.transactions = taken_stats.raw.transactions;
631 stats.bytes_written = taken_stats.raw.bytes_written;
632 stats.bytes_read = taken_stats.raw.bytes_read;
633 stats.cache_reads = taken_stats.raw.cache_hit_count;
634 stats.started = taken_stats.started;
635 stats.span = taken_stats.started.elapsed();
636
637 stats
638 }
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644 use kvdb_shared_tests as st;
645 use std::io::{self, Read};
646 use tempfile::Builder as TempfileBuilder;
647
648 fn create(columns: u32) -> io::Result<Database> {
649 let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
650 let config = DatabaseConfig::with_columns(columns);
651 Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))
652 }
653
654 #[test]
655 fn get_fails_with_non_existing_column() -> io::Result<()> {
656 let db = create(1)?;
657 st::test_get_fails_with_non_existing_column(&db)
658 }
659
660 #[test]
661 fn put_and_get() -> io::Result<()> {
662 let db = create(1)?;
663 st::test_put_and_get(&db)
664 }
665
666 #[test]
667 fn delete_and_get() -> io::Result<()> {
668 let db = create(1)?;
669 st::test_delete_and_get(&db)
670 }
671
672 #[test]
673 fn delete_prefix() -> io::Result<()> {
674 let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
675 st::test_delete_prefix(&db)
676 }
677
678 #[test]
679 fn iter() -> io::Result<()> {
680 let db = create(1)?;
681 st::test_iter(&db)
682 }
683
684 #[test]
685 fn iter_with_prefix() -> io::Result<()> {
686 let db = create(1)?;
687 st::test_iter_with_prefix(&db)
688 }
689
690 #[test]
691 fn complex() -> io::Result<()> {
692 let db = create(1)?;
693 st::test_complex(&db)
694 }
695
696 #[test]
697 fn stats() -> io::Result<()> {
698 let db = create(st::IO_STATS_NUM_COLUMNS)?;
699 st::test_io_stats(&db)
700 }
701
702 #[test]
703 fn secondary_db_get() -> io::Result<()> {
704 let primary = TempfileBuilder::new().prefix("").tempdir()?;
705 let secondary = TempfileBuilder::new().prefix("").tempdir()?;
706 let config = DatabaseConfig::with_columns(1);
707 let db = Database::open(&config, primary.path()).unwrap();
708
709 let key1 = b"key1";
710 let mut transaction = db.transaction();
711 transaction.put(0, key1, b"horse");
712 db.write(transaction)?;
713
714 let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
715 let second_db = Database::open(&config, primary.path()).unwrap();
716 assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
717 Ok(())
718 }
719
720 #[test]
721 fn secondary_db_catch_up() -> io::Result<()> {
722 let primary = TempfileBuilder::new().prefix("").tempdir()?;
723 let secondary = TempfileBuilder::new().prefix("").tempdir()?;
724 let config = DatabaseConfig::with_columns(1);
725 let db = Database::open(&config, primary.path()).unwrap();
726
727 let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
728 let second_db = Database::open(&config, primary.path()).unwrap();
729
730 let mut transaction = db.transaction();
731 transaction.put(0, b"key1", b"mule");
732 transaction.put(0, b"key2", b"cat");
733 db.write(transaction)?;
734
735 second_db.try_catch_up_with_primary()?;
736 assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
737 Ok(())
738 }
739
740 #[test]
741 #[cfg(target_os = "linux")]
742 fn df_to_rotational() {
743 use std::path::PathBuf;
744 let example_df = vec![
746 70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107,
747 115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101,
748 37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32,
749 32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57,
750 52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10,
751 ];
752 let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
753 assert_eq!(rotational_from_df_output(example_df), expected_output);
754 }
755
756 #[test]
757 #[should_panic]
758 fn db_config_with_zero_columns() {
759 let _cfg = DatabaseConfig::with_columns(0);
760 }
761
762 #[test]
763 #[should_panic]
764 fn open_db_with_zero_columns() {
765 let cfg = DatabaseConfig { columns: 0, ..Default::default() };
766 let _db = Database::open(&cfg, "");
767 }
768
769 #[test]
770 fn add_columns() {
771 let config_1 = DatabaseConfig::default();
772 let config_5 = DatabaseConfig::with_columns(5);
773
774 let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
775
776 {
778 let mut db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
779 assert_eq!(db.num_columns(), 1);
780
781 for i in 2..=5 {
782 db.add_column().unwrap();
783 assert_eq!(db.num_columns(), i);
784 }
785 }
786
787 {
789 let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
790 assert_eq!(db.num_columns(), 5);
791 }
792 }
793
794 #[test]
795 fn remove_columns() {
796 let config_1 = DatabaseConfig::default();
797 let config_5 = DatabaseConfig::with_columns(5);
798
799 let tempdir = TempfileBuilder::new().prefix("drop_columns").tempdir().unwrap();
800
801 {
803 let mut db = Database::open(&config_5, tempdir.path()).expect("open with 5 columns");
804 assert_eq!(db.num_columns(), 5);
805
806 for i in (1..5).rev() {
807 db.remove_last_column().unwrap();
808 assert_eq!(db.num_columns(), i);
809 }
810 }
811
812 {
814 let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
815 assert_eq!(db.num_columns(), 1);
816 }
817 }
818
819 #[test]
820 fn test_num_keys() {
821 let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
822 let config = DatabaseConfig::with_columns(1);
823 let db = Database::open(&config, tempdir.path()).unwrap();
824
825 assert_eq!(db.num_keys(0).unwrap(), 0, "database is empty after creation");
826 let key1 = b"beef";
827 let mut batch = db.transaction();
828 batch.put(0, key1, key1);
829 db.write(batch).unwrap();
830 assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count");
831 }
832
833 #[test]
834 fn default_memory_budget() {
835 let c = DatabaseConfig::default();
836 assert_eq!(c.columns, 1);
837 assert_eq!(c.memory_budget(), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget is default");
838 assert_eq!(
839 c.memory_budget_for_col(0),
840 DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
841 "total memory budget for column 0 is the default"
842 );
843 assert_eq!(
844 c.memory_budget_for_col(999),
845 DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
846 "total memory budget for any column is the default"
847 );
848 }
849
850 #[test]
851 fn memory_budget() {
852 let mut c = DatabaseConfig::with_columns(3);
853 c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().collect();
854 assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
855 }
856
857 #[test]
858 fn test_stats_parser() {
859 let raw = r#"rocksdb.row.cache.hit COUNT : 1
860rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15
861"#;
862 let stats = stats::parse_rocksdb_stats(raw);
863 assert_eq!(stats["row.cache.hit"].count, 1);
864 assert!(stats["row.cache.hit"].times.is_none());
865 assert_eq!(stats["db.get.micros"].count, 0);
866 let get_times = stats["db.get.micros"].times.unwrap();
867 assert_eq!(get_times.sum, 15);
868 assert_eq!(get_times.p50, 2.0);
869 assert_eq!(get_times.p95, 3.0);
870 assert_eq!(get_times.p99, 4.0);
871 assert_eq!(get_times.p100, 5.0);
872 }
873
874 #[test]
875 fn rocksdb_settings() {
876 const NUM_COLS: usize = 2;
877 let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
878 cfg.max_open_files = 123; cfg.compaction.block_size = 323232;
880 cfg.compaction.initial_file_size = 102030;
881 cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();
882
883 let db_path = TempfileBuilder::new()
884 .prefix("config_test")
885 .tempdir()
886 .expect("the OS can create tmp dirs");
887 let db = Database::open(&cfg, db_path.path()).expect("can open a db");
888 let statistics = db.get_statistics();
889 assert!(statistics.contains_key("block.cache.hit"));
890 drop(db);
891
892 let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap()))
893 .expect("rocksdb creates a LOG file");
894 let mut settings = String::new();
895 rocksdb_log.read_to_string(&mut settings).unwrap();
896 assert!(settings.contains("Options for column family [default]"), "no default col");
898 assert!(settings.contains("Options for column family [col0]"), "no col0");
899 assert!(settings.contains("Options for column family [col1]"), "no col1");
900
901 assert!(settings.contains("max_open_files: 123"));
903
904 assert!(settings.contains(" block_size: 323232"));
906
907 assert!(settings.contains("block_cache_options:\n capacity : 8388608"));
909 let lru_size = (330 * MB) / 3;
911 let needle = format!("block_cache_options:\n capacity : {}", lru_size);
912 let lru = settings.match_indices(&needle).collect::<Vec<_>>().len();
913 assert_eq!(lru, NUM_COLS);
914
915 let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::<Vec<_>>().len();
917 assert_eq!(include_indexes, NUM_COLS);
918 let pins = settings
920 .matches("pin_l0_filter_and_index_blocks_in_cache: 1")
921 .collect::<Vec<_>>()
922 .len();
923 assert_eq!(pins, NUM_COLS);
924
925 let l0_sizes = settings.matches("target_file_size_base: 102030").collect::<Vec<_>>().len();
927 assert_eq!(l0_sizes, NUM_COLS);
928 assert!(settings.contains("target_file_size_base: 67108864"));
930
931 let snappy_compression = settings.matches("Options.compression: Snappy").collect::<Vec<_>>().len();
933 assert_eq!(snappy_compression, NUM_COLS + 1);
935 let snappy_bottommost = settings
937 .matches("Options.bottommost_compression: Disabled")
938 .collect::<Vec<_>>()
939 .len();
940 assert_eq!(snappy_bottommost, NUM_COLS + 1);
941
942 let levels = settings.matches("Options.num_levels: 7").collect::<Vec<_>>().len();
944 assert_eq!(levels, NUM_COLS + 1);
945
946 assert!(settings.contains("Options.use_fsync: 0"));
948
949 assert!(settings.contains("format_version: 5"));
951 }
952}