1mod iter;
10mod stats;
11
12use std::{
13 cmp,
14 collections::HashMap,
15 error, io,
16 path::{Path, PathBuf},
17};
18
19use rocksdb::{
20 BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, CompactOptions, Options, ReadOptions, WriteBatch,
21 WriteOptions, DB,
22};
23
24use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
25
26#[cfg(target_os = "linux")]
27use regex::Regex;
28#[cfg(target_os = "linux")]
29use std::fs::File;
30#[cfg(target_os = "linux")]
31use std::process::Command;
32
33fn other_io_err<E>(e: E) -> io::Error
34where
35 E: Into<Box<dyn error::Error + Send + Sync>>,
36{
37 io::Error::new(io::ErrorKind::Other, e)
38}
39
40fn invalid_column(col: u32) -> io::Error {
41 other_io_err(format!("No such column family: {:?}", col))
42}
43
44type MiB = usize;
46
47const KB: usize = 1_024;
48const MB: usize = 1_024 * KB;
49
50pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
52
53pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;
55
56#[derive(Clone, Copy, PartialEq, Debug)]
61pub struct CompactionProfile {
62 pub initial_file_size: u64,
66 pub block_size: usize,
68}
69
70impl Default for CompactionProfile {
71 fn default() -> CompactionProfile {
73 CompactionProfile::ssd()
74 }
75}
76
77#[cfg(target_os = "linux")]
79pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
80 use std::str;
81 str::from_utf8(df_out.as_slice())
82 .ok()
83 .and_then(|df_str| {
85 Regex::new(r"/dev/(sd[:alpha:]{1,2})")
86 .ok()
87 .and_then(|re| re.captures(df_str))
88 .and_then(|captures| captures.get(1))
89 })
90 .map(|drive_path| {
92 let mut p = PathBuf::from("/sys/block");
93 p.push(drive_path.as_str());
94 p.push("queue/rotational");
95 p
96 })
97}
98
99impl CompactionProfile {
100 #[cfg(target_os = "linux")]
102 pub fn auto<P: AsRef<Path>>(db_path: P) -> CompactionProfile {
103 use std::io::Read;
104 let hdd_check_file = db_path
105 .as_ref()
106 .to_str()
107 .and_then(|path_str| Command::new("df").arg(path_str).output().ok())
108 .and_then(|df_res| if df_res.status.success() { Some(df_res.stdout) } else { None })
109 .and_then(rotational_from_df_output);
110 if let Some(hdd_check) = hdd_check_file {
112 if let Ok(mut file) = File::open(hdd_check.as_path()) {
113 let mut buffer = [0; 1];
114 if file.read_exact(&mut buffer).is_ok() {
115 if buffer == [48] {
117 return Self::ssd()
118 }
119 if buffer == [49] {
121 return Self::hdd()
122 }
123 }
124 }
125 }
126 Self::default()
128 }
129
130 #[cfg(not(target_os = "linux"))]
132 pub fn auto<P: AsRef<Path>>(_db_path: P) -> CompactionProfile {
133 Self::default()
134 }
135
136 pub fn ssd() -> CompactionProfile {
138 CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB }
139 }
140
141 pub fn hdd() -> CompactionProfile {
143 CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB }
144 }
145}
146
147#[derive(Clone)]
149#[non_exhaustive]
150pub struct DatabaseConfig {
151 pub max_open_files: i32,
153 pub memory_budget: HashMap<u32, MiB>,
158 pub compaction: CompactionProfile,
160 pub columns: u32,
166 pub keep_log_file_num: i32,
168 pub enable_statistics: bool,
174 pub secondary: Option<PathBuf>,
185 pub max_total_wal_size: Option<u64>,
188 pub create_if_missing: bool,
191}
192
193impl DatabaseConfig {
194 pub fn with_columns(columns: u32) -> Self {
201 assert!(columns > 0, "the number of columns must not be zero");
202
203 Self { columns, ..Default::default() }
204 }
205
206 pub fn memory_budget(&self) -> MiB {
208 (0..self.columns)
209 .map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
210 .sum()
211 }
212
213 fn memory_budget_for_col(&self, col: u32) -> MiB {
215 self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
216 }
217
218 fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options {
220 let column_mem_budget = self.memory_budget_for_col(col);
221 let mut opts = Options::default();
222
223 opts.set_level_compaction_dynamic_level_bytes(true);
224 opts.set_block_based_table_factory(block_opts);
225 opts.optimize_level_style_compaction(column_mem_budget);
226 opts.set_target_file_size_base(self.compaction.initial_file_size);
227 opts.set_compression_per_level(&[]);
228
229 opts
230 }
231}
232
233impl Default for DatabaseConfig {
234 fn default() -> DatabaseConfig {
235 DatabaseConfig {
236 max_open_files: 512,
237 memory_budget: HashMap::new(),
238 compaction: CompactionProfile::default(),
239 columns: 1,
240 keep_log_file_num: 1,
241 enable_statistics: false,
242 secondary: None,
243 max_total_wal_size: None,
244 create_if_missing: true,
245 }
246 }
247}
248
249struct DBAndColumns {
250 db: DB,
251 column_names: Vec<String>,
252}
253
254impl DBAndColumns {
255 fn cf(&self, i: usize) -> io::Result<&ColumnFamily> {
256 let name = self.column_names.get(i).ok_or_else(|| invalid_column(i as u32))?;
257 self.db
258 .cf_handle(&name)
259 .ok_or_else(|| other_io_err(format!("invalid column name: {name}")))
260 }
261}
262
263pub struct Database {
265 inner: DBAndColumns,
266 config: DatabaseConfig,
267 opts: Options,
268 write_opts: WriteOptions,
269 read_opts: ReadOptions,
270 block_opts: BlockBasedOptions,
271 stats: stats::RunningDbStats,
272}
273
274fn generate_options(config: &DatabaseConfig) -> Options {
276 let mut opts = Options::default();
277
278 opts.set_report_bg_io_stats(true);
279 if config.enable_statistics {
280 opts.enable_statistics();
281 }
282 opts.set_use_fsync(false);
283 opts.create_if_missing(config.create_if_missing);
284 if config.secondary.is_some() {
285 opts.set_max_open_files(-1)
286 } else {
287 opts.set_max_open_files(config.max_open_files);
288 }
289 opts.set_bytes_per_sync(1 * MB as u64);
290 opts.set_keep_log_file_num(1);
291 opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
292 if let Some(m) = config.max_total_wal_size {
293 opts.set_max_total_wal_size(m);
294 }
295
296 opts
297}
298
299fn generate_read_options() -> ReadOptions {
300 let mut read_opts = ReadOptions::default();
301 read_opts.set_verify_checksums(false);
302 read_opts
303}
304
305fn generate_block_based_options(config: &DatabaseConfig) -> io::Result<BlockBasedOptions> {
307 let mut block_opts = BlockBasedOptions::default();
308 block_opts.set_block_size(config.compaction.block_size);
309 block_opts.set_format_version(5);
311 block_opts.set_block_restart_interval(16);
312 let cache_size = config.memory_budget() / 3;
315 if cache_size == 0 {
316 block_opts.disable_cache()
317 } else {
318 let cache = rocksdb::Cache::new_lru_cache(cache_size);
319 block_opts.set_block_cache(&cache);
320 block_opts.set_cache_index_and_filter_blocks(true);
323 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
325 }
326 block_opts.set_bloom_filter(10.0, true);
327
328 Ok(block_opts)
329}
330
331impl Database {
332 pub fn open<P: AsRef<Path>>(config: &DatabaseConfig, path: P) -> io::Result<Database> {
338 assert!(config.columns > 0, "the number of columns must not be zero");
339
340 let opts = generate_options(config);
341 let block_opts = generate_block_based_options(config)?;
342
343 let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
344 let write_opts = WriteOptions::default();
345 let read_opts = generate_read_options();
346
347 let db = if let Some(secondary_path) = &config.secondary {
348 Self::open_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names.as_slice())?
349 } else {
350 let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
351 Self::open_primary(&opts, path.as_ref(), config, column_names.as_slice(), &block_opts)?
352 };
353
354 Ok(Database {
355 inner: DBAndColumns { db, column_names },
356 config: config.clone(),
357 opts,
358 read_opts,
359 write_opts,
360 block_opts,
361 stats: stats::RunningDbStats::new(),
362 })
363 }
364
365 fn open_primary<P: AsRef<Path>>(
367 opts: &Options,
368 path: P,
369 config: &DatabaseConfig,
370 column_names: &[&str],
371 block_opts: &BlockBasedOptions,
372 ) -> io::Result<rocksdb::DB> {
373 let cf_descriptors: Vec<_> = (0..config.columns)
374 .map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
375 .collect();
376
377 let db = match DB::open_cf_descriptors(&opts, path.as_ref(), cf_descriptors) {
378 Err(_) => {
379 match DB::open_cf(&opts, path.as_ref(), &[] as &[&str]) {
381 Ok(mut db) => {
382 for (i, name) in column_names.iter().enumerate() {
383 let _ = db
384 .create_cf(name, &config.column_config(&block_opts, i as u32))
385 .map_err(other_io_err)?;
386 }
387 Ok(db)
388 },
389 err => err,
390 }
391 },
392 ok => ok,
393 };
394
395 Ok(match db {
396 Ok(db) => db,
397 Err(s) => return Err(other_io_err(s)),
398 })
399 }
400
401 fn open_secondary<P: AsRef<Path>>(
404 opts: &Options,
405 path: P,
406 secondary_path: P,
407 column_names: &[String],
408 ) -> io::Result<rocksdb::DB> {
409 let db = DB::open_cf_as_secondary(&opts, path.as_ref(), secondary_path.as_ref(), column_names);
410
411 Ok(match db {
412 Ok(db) => db,
413 Err(s) => return Err(other_io_err(s)),
414 })
415 }
416
417 pub fn transaction(&self) -> DBTransaction {
419 DBTransaction::new()
420 }
421
422 pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
424 let cfs = &self.inner;
425 let mut batch = WriteBatch::default();
426 let ops = tr.ops;
427
428 self.stats.tally_writes(ops.len() as u64);
429 self.stats.tally_transactions(1);
430
431 let mut stats_total_bytes = 0;
432
433 for op in ops {
434 let col = op.col();
435 let cf = cfs.cf(col as usize)?;
436
437 match op {
438 DBOp::Insert { col: _, key, value } => {
439 stats_total_bytes += key.len() + value.len();
440 batch.put_cf(cf, &key, &value);
441 },
442 DBOp::Delete { col: _, key } => {
443 stats_total_bytes += key.len();
445 batch.delete_cf(cf, &key);
446 },
447 DBOp::DeletePrefix { col, prefix } => {
448 let end_prefix = kvdb::end_prefix(&prefix[..]);
449 let no_end = end_prefix.is_none();
450 let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
451 batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
452 if no_end {
453 let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
454 for result in self.iter_with_prefix(col, prefix) {
455 let (key, _) = result?;
456 batch.delete_cf(cf, &key[..]);
457 }
458 }
459 },
460 };
461 }
462 self.stats.tally_bytes_written(stats_total_bytes as u64);
463
464 cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err)
465 }
466
467 pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
469 let cfs = &self.inner;
470 let cf = cfs.cf(col as usize)?;
471 self.stats.tally_reads(1);
472 let value = cfs
473 .db
474 .get_pinned_cf_opt(cf, key, &self.read_opts)
475 .map(|r| r.map(|v| v.to_vec()))
476 .map_err(other_io_err);
477
478 match value {
479 Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
480 Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
481 _ => {},
482 };
483
484 value
485 }
486
487 pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
489 self.iter_with_prefix(col, prefix)
490 .next()
491 .transpose()
492 .map(|m| m.map(|(_k, v)| v))
493 }
494
495 pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
499 let read_opts = generate_read_options();
500 iter::IterationHandler::iter(&self.inner, col, read_opts)
501 }
502
503 fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = io::Result<DBKeyValue>> + 'a {
507 let mut read_opts = generate_read_options();
508 if let Some(end_prefix) = kvdb::end_prefix(prefix) {
510 read_opts.set_iterate_upper_bound(end_prefix);
511 }
512 iter::IterationHandler::iter_with_prefix(&self.inner, col, prefix, read_opts)
513 }
514
515 pub fn num_columns(&self) -> u32 {
517 self.inner.column_names.len() as u32
518 }
519
520 pub fn num_keys(&self, col: u32) -> io::Result<u64> {
522 const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
523 let cfs = &self.inner;
524 let cf = cfs.cf(col as usize)?;
525 match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
526 Ok(estimate) => Ok(estimate.unwrap_or_default()),
527 Err(err_string) => Err(other_io_err(err_string)),
528 }
529 }
530
531 pub fn remove_last_column(&mut self) -> io::Result<()> {
533 let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
534 if let Some(name) = column_names.pop() {
535 db.drop_cf(&name).map_err(other_io_err)?;
536 }
537 Ok(())
538 }
539
540 pub fn add_column(&mut self) -> io::Result<()> {
542 let DBAndColumns { ref mut db, ref mut column_names } = self.inner;
543 let col = column_names.len() as u32;
544 let name = format!("col{}", col);
545 let col_config = self.config.column_config(&self.block_opts, col as u32);
546 let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?;
547 column_names.push(name);
548 Ok(())
549 }
550
551 pub fn get_statistics(&self) -> HashMap<String, stats::RocksDbStatsValue> {
553 if let Some(stats) = self.opts.get_statistics() {
554 stats::parse_rocksdb_stats(&stats)
555 } else {
556 HashMap::new()
557 }
558 }
559
560 pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
581 self.inner.db.try_catch_up_with_primary().map_err(other_io_err)
582 }
583
584 pub fn force_compact(&self, col: u32) -> io::Result<()> {
588 let mut compact_options = CompactOptions::default();
589 compact_options.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force);
590 self.inner.db.compact_range_cf_opt(
591 self.inner.cf(col as usize)?,
592 None::<Vec<u8>>,
593 None::<Vec<u8>>,
594 &compact_options,
595 );
596 Ok(())
597 }
598}
599
600impl KeyValueDB for Database {
603 fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
604 Database::get(self, col, key)
605 }
606
607 fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> io::Result<Option<DBValue>> {
608 Database::get_by_prefix(self, col, prefix)
609 }
610
611 fn write(&self, transaction: DBTransaction) -> io::Result<()> {
612 Database::write(self, transaction)
613 }
614
615 fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
616 let unboxed = Database::iter(self, col);
617 Box::new(unboxed.into_iter())
618 }
619
620 fn iter_with_prefix<'a>(
621 &'a self,
622 col: u32,
623 prefix: &'a [u8],
624 ) -> Box<dyn Iterator<Item = io::Result<DBKeyValue>> + 'a> {
625 let unboxed = Database::iter_with_prefix(self, col, prefix);
626 Box::new(unboxed.into_iter())
627 }
628
629 fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats {
630 let rocksdb_stats = self.get_statistics();
631 let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64);
632 let overall_stats = self.stats.overall();
633 let old_cache_hit_count = overall_stats.raw.cache_hit_count;
634
635 self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count);
636
637 let taken_stats = match kind {
638 kvdb::IoStatsKind::Overall => self.stats.overall(),
639 kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
640 };
641
642 let mut stats = kvdb::IoStats::empty();
643
644 stats.reads = taken_stats.raw.reads;
645 stats.writes = taken_stats.raw.writes;
646 stats.transactions = taken_stats.raw.transactions;
647 stats.bytes_written = taken_stats.raw.bytes_written;
648 stats.bytes_read = taken_stats.raw.bytes_read;
649 stats.cache_reads = taken_stats.raw.cache_hit_count;
650 stats.started = taken_stats.started;
651 stats.span = taken_stats.started.elapsed();
652
653 stats
654 }
655}
656
657#[cfg(test)]
658mod tests {
659 use super::*;
660 use kvdb_shared_tests as st;
661 use std::io::{self, Read};
662 use tempfile::Builder as TempfileBuilder;
663
664 fn create(columns: u32) -> io::Result<Database> {
665 let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
666 let config = DatabaseConfig::with_columns(columns);
667 Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))
668 }
669
670 #[test]
671 fn get_fails_with_non_existing_column() -> io::Result<()> {
672 let db = create(1)?;
673 st::test_get_fails_with_non_existing_column(&db)
674 }
675
676 #[test]
677 fn put_and_get() -> io::Result<()> {
678 let db = create(1)?;
679 st::test_put_and_get(&db)
680 }
681
682 #[test]
683 fn delete_and_get() -> io::Result<()> {
684 let db = create(1)?;
685 st::test_delete_and_get(&db)
686 }
687
688 #[test]
689 fn delete_prefix() -> io::Result<()> {
690 let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
691 st::test_delete_prefix(&db)
692 }
693
694 #[test]
695 fn iter() -> io::Result<()> {
696 let db = create(1)?;
697 st::test_iter(&db)
698 }
699
700 #[test]
701 fn iter_with_prefix() -> io::Result<()> {
702 let db = create(1)?;
703 st::test_iter_with_prefix(&db)
704 }
705
706 #[test]
707 fn complex() -> io::Result<()> {
708 let db = create(1)?;
709 st::test_complex(&db)
710 }
711
712 #[test]
713 fn stats() -> io::Result<()> {
714 let db = create(st::IO_STATS_NUM_COLUMNS)?;
715 st::test_io_stats(&db)
716 }
717
718 #[test]
719 fn secondary_db_get() -> io::Result<()> {
720 let primary = TempfileBuilder::new().prefix("").tempdir()?;
721 let secondary = TempfileBuilder::new().prefix("").tempdir()?;
722 let config = DatabaseConfig::with_columns(1);
723 let db = Database::open(&config, primary.path()).unwrap();
724
725 let key1 = b"key1";
726 let mut transaction = db.transaction();
727 transaction.put(0, key1, b"horse");
728 db.write(transaction)?;
729
730 let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
731 let second_db = Database::open(&config, primary.path()).unwrap();
732 assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
733 Ok(())
734 }
735
736 #[test]
737 fn secondary_db_catch_up() -> io::Result<()> {
738 let primary = TempfileBuilder::new().prefix("").tempdir()?;
739 let secondary = TempfileBuilder::new().prefix("").tempdir()?;
740 let config = DatabaseConfig::with_columns(1);
741 let db = Database::open(&config, primary.path()).unwrap();
742
743 let config = DatabaseConfig { secondary: Some(secondary.path().to_owned()), ..DatabaseConfig::with_columns(1) };
744 let second_db = Database::open(&config, primary.path()).unwrap();
745
746 let mut transaction = db.transaction();
747 transaction.put(0, b"key1", b"mule");
748 transaction.put(0, b"key2", b"cat");
749 db.write(transaction)?;
750
751 second_db.try_catch_up_with_primary()?;
752 assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
753 Ok(())
754 }
755
756 #[test]
757 #[cfg(target_os = "linux")]
758 fn df_to_rotational() {
759 use std::path::PathBuf;
760 let example_df = vec![
762 70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107,
763 115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101,
764 37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32,
765 32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57,
766 52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10,
767 ];
768 let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
769 assert_eq!(rotational_from_df_output(example_df), expected_output);
770 }
771
772 #[test]
773 #[should_panic]
774 fn db_config_with_zero_columns() {
775 let _cfg = DatabaseConfig::with_columns(0);
776 }
777
778 #[test]
779 #[should_panic]
780 fn open_db_with_zero_columns() {
781 let cfg = DatabaseConfig { columns: 0, ..Default::default() };
782 let _db = Database::open(&cfg, "");
783 }
784
785 #[test]
786 fn add_columns() {
787 let config_1 = DatabaseConfig::default();
788 let config_5 = DatabaseConfig::with_columns(5);
789
790 let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
791
792 {
794 let mut db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
795 assert_eq!(db.num_columns(), 1);
796
797 for i in 2..=5 {
798 db.add_column().unwrap();
799 assert_eq!(db.num_columns(), i);
800 }
801 }
802
803 {
805 let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
806 assert_eq!(db.num_columns(), 5);
807 }
808 }
809
810 #[test]
811 fn remove_columns() {
812 let config_1 = DatabaseConfig::default();
813 let config_5 = DatabaseConfig::with_columns(5);
814
815 let tempdir = TempfileBuilder::new().prefix("drop_columns").tempdir().unwrap();
816
817 {
819 let mut db = Database::open(&config_5, tempdir.path()).expect("open with 5 columns");
820 assert_eq!(db.num_columns(), 5);
821
822 for i in (1..5).rev() {
823 db.remove_last_column().unwrap();
824 assert_eq!(db.num_columns(), i);
825 }
826 }
827
828 {
830 let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
831 assert_eq!(db.num_columns(), 1);
832 }
833 }
834
835 #[test]
836 fn test_num_keys() {
837 let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
838 let config = DatabaseConfig::with_columns(1);
839 let db = Database::open(&config, tempdir.path()).unwrap();
840
841 assert_eq!(db.num_keys(0).unwrap(), 0, "database is empty after creation");
842 let key1 = b"beef";
843 let mut batch = db.transaction();
844 batch.put(0, key1, key1);
845 db.write(batch).unwrap();
846 assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count");
847 }
848
849 #[test]
850 fn default_memory_budget() {
851 let c = DatabaseConfig::default();
852 assert_eq!(c.columns, 1);
853 assert_eq!(c.memory_budget(), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget is default");
854 assert_eq!(
855 c.memory_budget_for_col(0),
856 DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
857 "total memory budget for column 0 is the default"
858 );
859 assert_eq!(
860 c.memory_budget_for_col(999),
861 DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
862 "total memory budget for any column is the default"
863 );
864 }
865
866 #[test]
867 fn memory_budget() {
868 let mut c = DatabaseConfig::with_columns(3);
869 c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().collect();
870 assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
871 }
872
873 #[test]
874 fn test_stats_parser() {
875 let raw = r#"rocksdb.row.cache.hit COUNT : 1
876rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15
877"#;
878 let stats = stats::parse_rocksdb_stats(raw);
879 assert_eq!(stats["row.cache.hit"].count, 1);
880 assert!(stats["row.cache.hit"].times.is_none());
881 assert_eq!(stats["db.get.micros"].count, 0);
882 let get_times = stats["db.get.micros"].times.unwrap();
883 assert_eq!(get_times.sum, 15);
884 assert_eq!(get_times.p50, 2.0);
885 assert_eq!(get_times.p95, 3.0);
886 assert_eq!(get_times.p99, 4.0);
887 assert_eq!(get_times.p100, 5.0);
888 }
889
890 #[test]
891 fn rocksdb_settings() {
892 const NUM_COLS: usize = 2;
893 let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
894 cfg.max_open_files = 123; cfg.compaction.block_size = 323232;
896 cfg.compaction.initial_file_size = 102030;
897 cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();
898
899 let db_path = TempfileBuilder::new()
900 .prefix("config_test")
901 .tempdir()
902 .expect("the OS can create tmp dirs");
903 let db = Database::open(&cfg, db_path.path()).expect("can open a db");
904 let statistics = db.get_statistics();
905 assert!(statistics.contains_key("block.cache.hit"));
906 drop(db);
907
908 let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap()))
909 .expect("rocksdb creates a LOG file");
910 let mut settings = String::new();
911 rocksdb_log.read_to_string(&mut settings).unwrap();
912 assert!(settings.contains("Options for column family [default]"), "no default col");
914 assert!(settings.contains("Options for column family [col0]"), "no col0");
915 assert!(settings.contains("Options for column family [col1]"), "no col1");
916
917 assert!(settings.contains("max_open_files: 123"));
919
920 assert!(settings.contains(" block_size: 323232"));
922
923 assert!(settings.contains("block_cache_options:\n capacity : 115343360"));
925 let lru_size = (330 * MB) / 3;
927 let needle = format!("block_cache_options:\n capacity : {}", lru_size);
928 let lru = settings.match_indices(&needle).collect::<Vec<_>>().len();
929 assert_eq!(lru, NUM_COLS);
930
931 let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::<Vec<_>>().len();
933 assert_eq!(include_indexes, NUM_COLS);
934 let pins = settings
936 .matches("pin_l0_filter_and_index_blocks_in_cache: 1")
937 .collect::<Vec<_>>()
938 .len();
939 assert_eq!(pins, NUM_COLS);
940
941 let l0_sizes = settings.matches("target_file_size_base: 102030").collect::<Vec<_>>().len();
943 assert_eq!(l0_sizes, NUM_COLS);
944 assert!(settings.contains("target_file_size_base: 67108864"));
946
947 let snappy_compression = settings.matches("Options.compression: Snappy").collect::<Vec<_>>().len();
949 assert_eq!(snappy_compression, NUM_COLS + 1);
951 let snappy_bottommost = settings
953 .matches("Options.bottommost_compression: Disabled")
954 .collect::<Vec<_>>()
955 .len();
956 assert_eq!(snappy_bottommost, NUM_COLS + 1);
957
958 let levels = settings.matches("Options.num_levels: 7").collect::<Vec<_>>().len();
960 assert_eq!(levels, NUM_COLS + 1);
961
962 assert!(settings.contains("Options.use_fsync: 0"));
964
965 assert!(settings.contains("format_version: 5"));
967 }
968}