1mod iter;
10mod stats;
11
12use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result};
13
14use tetsy_util_mem::MallocSizeOf;
15use parking_lot::RwLock;
16use rocksdb::{
17 BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB,
18};
19
20use crate::iter::KeyValuePair;
21use fs_swap::{swap, swap_nonatomic};
22use tetsy_kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB};
23use log::{debug, warn};
24
25#[cfg(target_os = "linux")]
26use regex::Regex;
27#[cfg(target_os = "linux")]
28use std::fs::File;
29#[cfg(target_os = "linux")]
30use std::path::PathBuf;
31#[cfg(target_os = "linux")]
32use std::process::Command;
33
34fn other_io_err<E>(e: E) -> io::Error
35where
36 E: Into<Box<dyn error::Error + Send + Sync>>,
37{
38 io::Error::new(io::ErrorKind::Other, e)
39}
40
41type MiB = usize;
43
44const KB: usize = 1_024;
45const MB: usize = 1_024 * KB;
46
47pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
49
50pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;
52
53#[derive(Clone, Copy, PartialEq, Debug)]
58pub struct CompactionProfile {
59 pub initial_file_size: u64,
63 pub block_size: usize,
65}
66
67impl Default for CompactionProfile {
68 fn default() -> CompactionProfile {
70 CompactionProfile::ssd()
71 }
72}
73
74#[cfg(target_os = "linux")]
76pub fn rotational_from_df_output(df_out: Vec<u8>) -> Option<PathBuf> {
77 use std::str;
78 str::from_utf8(df_out.as_slice())
79 .ok()
80 .and_then(|df_str| {
82 Regex::new(r"/dev/(sd[:alpha:]{1,2})")
83 .ok()
84 .and_then(|re| re.captures(df_str))
85 .and_then(|captures| captures.get(1))
86 })
87 .map(|drive_path| {
89 let mut p = PathBuf::from("/sys/block");
90 p.push(drive_path.as_str());
91 p.push("queue/rotational");
92 p
93 })
94}
95
96impl CompactionProfile {
97 #[cfg(target_os = "linux")]
99 pub fn auto(db_path: &Path) -> CompactionProfile {
100 use std::io::Read;
101 let hdd_check_file = db_path
102 .to_str()
103 .and_then(|path_str| Command::new("df").arg(path_str).output().ok())
104 .and_then(|df_res| if df_res.status.success() { Some(df_res.stdout) } else { None })
105 .and_then(rotational_from_df_output);
106 if let Some(hdd_check) = hdd_check_file {
108 if let Ok(mut file) = File::open(hdd_check.as_path()) {
109 let mut buffer = [0; 1];
110 if file.read_exact(&mut buffer).is_ok() {
111 if buffer == [48] {
113 return Self::ssd();
114 }
115 if buffer == [49] {
117 return Self::hdd();
118 }
119 }
120 }
121 }
122 Self::default()
124 }
125
126 #[cfg(not(target_os = "linux"))]
128 pub fn auto(_db_path: &Path) -> CompactionProfile {
129 Self::default()
130 }
131
132 pub fn ssd() -> CompactionProfile {
134 CompactionProfile { initial_file_size: 64 * MB as u64, block_size: 16 * KB }
135 }
136
137 pub fn hdd() -> CompactionProfile {
139 CompactionProfile { initial_file_size: 256 * MB as u64, block_size: 64 * KB }
140 }
141}
142
143#[derive(Clone)]
145pub struct DatabaseConfig {
146 pub max_open_files: i32,
148 pub memory_budget: HashMap<u32, MiB>,
153 pub compaction: CompactionProfile,
155 pub columns: u32,
161 pub keep_log_file_num: i32,
163 pub enable_statistics: bool,
169 pub secondary: Option<String>,
180}
181
182impl DatabaseConfig {
183 pub fn with_columns(columns: u32) -> Self {
190 assert!(columns > 0, "the number of columns must not be zero");
191
192 Self { columns, ..Default::default() }
193 }
194
195 pub fn memory_budget(&self) -> MiB {
197 (0..self.columns).map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB).sum()
198 }
199
200 fn memory_budget_for_col(&self, col: u32) -> MiB {
202 self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
203 }
204
205 fn column_config(&self, block_opts: &BlockBasedOptions, col: u32) -> Options {
207 let column_mem_budget = self.memory_budget_for_col(col);
208 let mut opts = Options::default();
209
210 opts.set_level_compaction_dynamic_level_bytes(true);
211 opts.set_block_based_table_factory(block_opts);
212 opts.optimize_level_style_compaction(column_mem_budget);
213 opts.set_target_file_size_base(self.compaction.initial_file_size);
214 opts.set_compression_per_level(&[]);
215
216 opts
217 }
218}
219
220impl Default for DatabaseConfig {
221 fn default() -> DatabaseConfig {
222 DatabaseConfig {
223 max_open_files: 512,
224 memory_budget: HashMap::new(),
225 compaction: CompactionProfile::default(),
226 columns: 1,
227 keep_log_file_num: 1,
228 enable_statistics: false,
229 secondary: None,
230 }
231 }
232}
233
234struct DBAndColumns {
235 db: DB,
236 column_names: Vec<String>,
237}
238
239impl MallocSizeOf for DBAndColumns {
240 fn size_of(&self, ops: &mut tetsy_util_mem::MallocSizeOfOps) -> usize {
241 let mut total = self.column_names.size_of(ops)
242 + self.db
244 .property_int_value_cf(self.cf(0), "rocksdb.block-cache-usage")
245 .unwrap_or(Some(0))
246 .map(|x| x as usize)
247 .unwrap_or(0);
248
249 for v in 0..self.column_names.len() {
250 total += self.static_property_or_warn(v, "rocksdb.estimate-table-readers-mem");
251 total += self.static_property_or_warn(v, "rocksdb.cur-size-all-mem-tables");
252 }
253
254 total
255 }
256}
257
258impl DBAndColumns {
259 fn cf(&self, i: usize) -> &ColumnFamily {
260 self.db.cf_handle(&self.column_names[i]).expect("the specified column name is correct; qed")
261 }
262
263 fn static_property_or_warn(&self, col: usize, prop: &str) -> usize {
264 match self.db.property_int_value_cf(self.cf(col), prop) {
265 Ok(Some(v)) => v as usize,
266 _ => {
267 warn!("Cannot read expected static property of RocksDb database: {}", prop);
268 0
269 }
270 }
271 }
272}
273
274#[derive(MallocSizeOf)]
276pub struct Database {
277 db: RwLock<Option<DBAndColumns>>,
278 #[ignore_malloc_size_of = "insignificant"]
279 config: DatabaseConfig,
280 path: String,
281 #[ignore_malloc_size_of = "insignificant"]
282 opts: Options,
283 #[ignore_malloc_size_of = "insignificant"]
284 write_opts: WriteOptions,
285 #[ignore_malloc_size_of = "insignificant"]
286 read_opts: ReadOptions,
287 #[ignore_malloc_size_of = "insignificant"]
288 block_opts: BlockBasedOptions,
289 #[ignore_malloc_size_of = "insignificant"]
290 stats: stats::RunningDbStats,
291}
292
293#[inline]
294fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, Error>) -> io::Result<T> {
295 if let Err(ref s) = res {
296 if is_corrupted(s) {
297 warn!("DB corrupted: {}. Repair will be triggered on next restart", s);
298 let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME));
299 }
300 }
301
302 res.map_err(other_io_err)
303}
304
305fn is_corrupted(err: &Error) -> bool {
306 err.as_ref().starts_with("Corruption:")
307 || err.as_ref().starts_with("Invalid argument: You have to open all column families")
308}
309
310fn generate_options(config: &DatabaseConfig) -> Options {
312 let mut opts = Options::default();
313
314 opts.set_report_bg_io_stats(true);
315 if config.enable_statistics {
316 opts.enable_statistics();
317 }
318 opts.set_use_fsync(false);
319 opts.create_if_missing(true);
320 if config.secondary.is_some() {
321 opts.set_max_open_files(-1)
322 } else {
323 opts.set_max_open_files(config.max_open_files);
324 }
325 opts.set_bytes_per_sync(1 * MB as u64);
326 opts.set_keep_log_file_num(1);
327 opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
328
329 opts
330}
331
332fn generate_read_options() -> ReadOptions {
333 let mut read_opts = ReadOptions::default();
334 read_opts.set_verify_checksums(false);
335 read_opts
336}
337
338fn generate_block_based_options(config: &DatabaseConfig) -> io::Result<BlockBasedOptions> {
340 let mut block_opts = BlockBasedOptions::default();
341 block_opts.set_block_size(config.compaction.block_size);
342 block_opts.set_format_version(5);
344 block_opts.set_block_restart_interval(16);
345 let cache_size = config.memory_budget() / 3;
348 if cache_size == 0 {
349 block_opts.disable_cache()
350 } else {
351 let cache = rocksdb::Cache::new_lru_cache(cache_size).map_err(other_io_err)?;
352 block_opts.set_block_cache(&cache);
353 block_opts.set_cache_index_and_filter_blocks(true);
356 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
358 }
359 block_opts.set_bloom_filter(10, true);
360
361 Ok(block_opts)
362}
363
364impl Database {
365 const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";
366
367 pub fn open(config: &DatabaseConfig, path: &str) -> io::Result<Database> {
373 assert!(config.columns > 0, "the number of columns must not be zero");
374
375 let opts = generate_options(config);
376 let block_opts = generate_block_based_options(config)?;
377
378 let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
380 if db_corrupted.exists() {
381 warn!("DB has been previously marked as corrupted, attempting repair");
382 DB::repair(&opts, path).map_err(other_io_err)?;
383 fs::remove_file(db_corrupted)?;
384 }
385
386 let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
387 let write_opts = WriteOptions::default();
388 let read_opts = generate_read_options();
389
390 let db = if let Some(secondary_path) = &config.secondary {
391 Self::open_secondary(&opts, path, secondary_path.as_str(), column_names.as_slice())?
392 } else {
393 let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
394 Self::open_primary(&opts, path, config, column_names.as_slice(), &block_opts)?
395 };
396
397 Ok(Database {
398 db: RwLock::new(Some(DBAndColumns { db, column_names })),
399 config: config.clone(),
400 path: path.to_owned(),
401 opts,
402 read_opts,
403 write_opts,
404 block_opts,
405 stats: stats::RunningDbStats::new(),
406 })
407 }
408
409 fn open_primary(
411 opts: &Options,
412 path: &str,
413 config: &DatabaseConfig,
414 column_names: &[&str],
415 block_opts: &BlockBasedOptions,
416 ) -> io::Result<rocksdb::DB> {
417 let cf_descriptors: Vec<_> = (0..config.columns)
418 .map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
419 .collect();
420
421 let db = match DB::open_cf_descriptors(&opts, path, cf_descriptors) {
422 Err(_) => {
423 match DB::open_cf(&opts, path, &[] as &[&str]) {
425 Ok(mut db) => {
426 for (i, name) in column_names.iter().enumerate() {
427 let _ = db
428 .create_cf(name, &config.column_config(&block_opts, i as u32))
429 .map_err(other_io_err)?;
430 }
431 Ok(db)
432 }
433 err => err,
434 }
435 }
436 ok => ok,
437 };
438
439 Ok(match db {
440 Ok(db) => db,
441 Err(ref s) if is_corrupted(s) => {
442 warn!("DB corrupted: {}, attempting repair", s);
443 DB::repair(&opts, path).map_err(other_io_err)?;
444
445 let cf_descriptors: Vec<_> = (0..config.columns)
446 .map(|i| {
447 ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i))
448 })
449 .collect();
450
451 DB::open_cf_descriptors(&opts, path, cf_descriptors).map_err(other_io_err)?
452 }
453 Err(s) => return Err(other_io_err(s)),
454 })
455 }
456
457 fn open_secondary(
460 opts: &Options,
461 path: &str,
462 secondary_path: &str,
463 column_names: &[String],
464 ) -> io::Result<rocksdb::DB> {
465 let db = DB::open_cf_as_secondary(&opts, path, secondary_path, column_names);
466
467 Ok(match db {
468 Ok(db) => db,
469 Err(ref s) if is_corrupted(s) => {
470 warn!("DB corrupted: {}, attempting repair", s);
471 DB::repair(&opts, path).map_err(other_io_err)?;
472 DB::open_cf_as_secondary(&opts, path, secondary_path, column_names).map_err(other_io_err)?
473 }
474 Err(s) => return Err(other_io_err(s)),
475 })
476 }
477
478 pub fn transaction(&self) -> DBTransaction {
480 DBTransaction::new()
481 }
482
483 pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
485 match *self.db.read() {
486 Some(ref cfs) => {
487 let mut batch = WriteBatch::default();
488 let ops = tr.ops;
489
490 self.stats.tally_writes(ops.len() as u64);
491 self.stats.tally_transactions(1);
492
493 let mut stats_total_bytes = 0;
494
495 for op in ops {
496 let cf = cfs.cf(op.col() as usize);
497
498 match op {
499 DBOp::Insert { col: _, key, value } => {
500 stats_total_bytes += key.len() + value.len();
501 batch.put_cf(cf, &key, &value);
502 }
503 DBOp::Delete { col: _, key } => {
504 stats_total_bytes += key.len();
506 batch.delete_cf(cf, &key);
507 }
508 DBOp::DeletePrefix { col, prefix } => {
509 let end_prefix = tetsy_kvdb::end_prefix(&prefix[..]);
510 let no_end = end_prefix.is_none();
511 let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
512 batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
513 if no_end {
514 use crate::iter::IterationHandler as _;
515
516 let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
517 let read_opts = generate_read_options();
520 for (key, _) in cfs.iter_with_prefix(col, prefix, read_opts) {
521 batch.delete_cf(cf, &key[..]);
522 }
523 }
524 }
525 };
526 }
527 self.stats.tally_bytes_written(stats_total_bytes as u64);
528
529 check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))
530 }
531 None => Err(other_io_err("Database is closed")),
532 }
533 }
534
535 pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
537 match *self.db.read() {
538 Some(ref cfs) => {
539 if cfs.column_names.get(col as usize).is_none() {
540 return Err(other_io_err("column index is out of bounds"));
541 }
542 self.stats.tally_reads(1);
543 let value = cfs
544 .db
545 .get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
546 .map(|r| r.map(|v| v.to_vec()))
547 .map_err(other_io_err);
548
549 match value {
550 Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
551 Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
552 _ => {}
553 };
554
555 value
556 }
557 None => Ok(None),
558 }
559 }
560
561 pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
563 self.iter_with_prefix(col, prefix).next().map(|(_, v)| v)
564 }
565
566 pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = KeyValuePair> + 'a {
570 let read_lock = self.db.read();
571 let optional = if read_lock.is_some() {
572 let read_opts = generate_read_options();
573 let guarded = iter::ReadGuardedIterator::new(read_lock, col, read_opts);
574 Some(guarded)
575 } else {
576 None
577 };
578 optional.into_iter().flat_map(identity)
579 }
580
581 fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a {
585 let read_lock = self.db.read();
586 let optional = if read_lock.is_some() {
587 let mut read_opts = generate_read_options();
588 if let Some(end_prefix) = tetsy_kvdb::end_prefix(prefix) {
590 read_opts.set_iterate_upper_bound(end_prefix);
591 }
592 let guarded = iter::ReadGuardedIterator::new_with_prefix(read_lock, col, prefix, read_opts);
593 Some(guarded)
594 } else {
595 None
596 };
597 optional.into_iter().flat_map(identity)
598 }
599
600 fn close(&self) {
602 *self.db.write() = None;
603 }
604
605 pub fn restore(&self, new_db: &str) -> io::Result<()> {
607 self.close();
608
609 match swap(new_db, &self.path) {
611 Ok(_) => {
612 let _ = fs::remove_dir_all(new_db);
614 }
615 Err(err) => {
616 debug!("DB atomic swap failed: {}", err);
617 match swap_nonatomic(new_db, &self.path) {
618 Ok(_) => {
619 let _ = fs::remove_dir_all(new_db);
621 }
622 Err(err) => {
623 warn!("Failed to swap DB directories: {:?}", err);
624 return Err(io::Error::new(
625 io::ErrorKind::Other,
626 "DB restoration failed: could not swap DB directories",
627 ));
628 }
629 }
630 }
631 }
632
633 let db = Self::open(&self.config, &self.path)?;
635 *self.db.write() = mem::replace(&mut *db.db.write(), None);
636 Ok(())
637 }
638
639 pub fn num_columns(&self) -> u32 {
641 self.db
642 .read()
643 .as_ref()
644 .and_then(|db| if db.column_names.is_empty() { None } else { Some(db.column_names.len()) })
645 .map(|n| n as u32)
646 .unwrap_or(0)
647 }
648
649 pub fn num_keys(&self, col: u32) -> io::Result<u64> {
651 const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
652 match *self.db.read() {
653 Some(ref cfs) => {
654 let cf = cfs.cf(col as usize);
655 match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
656 Ok(estimate) => Ok(estimate.unwrap_or_default()),
657 Err(err_string) => Err(other_io_err(err_string)),
658 }
659 }
660 None => Ok(0),
661 }
662 }
663
664 pub fn remove_last_column(&self) -> io::Result<()> {
666 match *self.db.write() {
667 Some(DBAndColumns { ref mut db, ref mut column_names }) => {
668 if let Some(name) = column_names.pop() {
669 db.drop_cf(&name).map_err(other_io_err)?;
670 }
671 Ok(())
672 }
673 None => Ok(()),
674 }
675 }
676
677 pub fn add_column(&self) -> io::Result<()> {
679 match *self.db.write() {
680 Some(DBAndColumns { ref mut db, ref mut column_names }) => {
681 let col = column_names.len() as u32;
682 let name = format!("col{}", col);
683 let col_config = self.config.column_config(&self.block_opts, col as u32);
684 let _ = db.create_cf(&name, &col_config).map_err(other_io_err)?;
685 column_names.push(name);
686 Ok(())
687 }
688 None => Ok(()),
689 }
690 }
691
692 pub fn get_statistics(&self) -> HashMap<String, stats::RocksDbStatsValue> {
694 if let Some(stats) = self.opts.get_statistics() {
695 stats::parse_rocksdb_stats(&stats)
696 } else {
697 HashMap::new()
698 }
699 }
700
701 pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
722 match self.db.read().as_ref() {
723 Some(DBAndColumns { db, .. }) => db.try_catch_up_with_primary().map_err(other_io_err),
724 None => Ok(()),
725 }
726 }
727}
728
729impl KeyValueDB for Database {
732 fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
733 Database::get(self, col, key)
734 }
735
736 fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
737 Database::get_by_prefix(self, col, prefix)
738 }
739
740 fn write(&self, transaction: DBTransaction) -> io::Result<()> {
741 Database::write(self, transaction)
742 }
743
744 fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
745 let unboxed = Database::iter(self, col);
746 Box::new(unboxed.into_iter())
747 }
748
749 fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
750 let unboxed = Database::iter_with_prefix(self, col, prefix);
751 Box::new(unboxed.into_iter())
752 }
753
754 fn restore(&self, new_db: &str) -> io::Result<()> {
755 Database::restore(self, new_db)
756 }
757
758 fn io_stats(&self, kind: tetsy_kvdb::IoStatsKind) -> tetsy_kvdb::IoStats {
759 let rocksdb_stats = self.get_statistics();
760 let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64);
761 let overall_stats = self.stats.overall();
762 let old_cache_hit_count = overall_stats.raw.cache_hit_count;
763
764 self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count);
765
766 let taken_stats = match kind {
767 tetsy_kvdb::IoStatsKind::Overall => self.stats.overall(),
768 tetsy_kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
769 };
770
771 let mut stats = tetsy_kvdb::IoStats::empty();
772
773 stats.reads = taken_stats.raw.reads;
774 stats.writes = taken_stats.raw.writes;
775 stats.transactions = taken_stats.raw.transactions;
776 stats.bytes_written = taken_stats.raw.bytes_written;
777 stats.bytes_read = taken_stats.raw.bytes_read;
778 stats.cache_reads = taken_stats.raw.cache_hit_count;
779 stats.started = taken_stats.started;
780 stats.span = taken_stats.started.elapsed();
781
782 stats
783 }
784}
785
786#[cfg(test)]
787mod tests {
788 use super::*;
789 use tetsy_kvdb_shared_tests as st;
790 use std::io::{self, Read};
791 use tempfile::Builder as TempfileBuilder;
792
793 fn create(columns: u32) -> io::Result<Database> {
794 let tempdir = TempfileBuilder::new().prefix("").tempdir()?;
795 let config = DatabaseConfig::with_columns(columns);
796 Database::open(&config, tempdir.path().to_str().expect("tempdir path is valid unicode"))
797 }
798
799 #[test]
800 fn get_fails_with_non_existing_column() -> io::Result<()> {
801 let db = create(1)?;
802 st::test_get_fails_with_non_existing_column(&db)
803 }
804
805 #[test]
806 fn put_and_get() -> io::Result<()> {
807 let db = create(1)?;
808 st::test_put_and_get(&db)
809 }
810
811 #[test]
812 fn delete_and_get() -> io::Result<()> {
813 let db = create(1)?;
814 st::test_delete_and_get(&db)
815 }
816
817 #[test]
818 fn delete_prefix() -> io::Result<()> {
819 let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
820 st::test_delete_prefix(&db)
821 }
822
823 #[test]
824 fn iter() -> io::Result<()> {
825 let db = create(1)?;
826 st::test_iter(&db)
827 }
828
829 #[test]
830 fn iter_with_prefix() -> io::Result<()> {
831 let db = create(1)?;
832 st::test_iter_with_prefix(&db)
833 }
834
835 #[test]
836 fn complex() -> io::Result<()> {
837 let db = create(1)?;
838 st::test_complex(&db)
839 }
840
841 #[test]
842 fn stats() -> io::Result<()> {
843 let db = create(st::IO_STATS_NUM_COLUMNS)?;
844 st::test_io_stats(&db)
845 }
846
847 #[test]
848 fn secondary_db_get() -> io::Result<()> {
849 let primary = TempfileBuilder::new().prefix("").tempdir()?;
850 let config = DatabaseConfig::with_columns(1);
851 let db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
852
853 let key1 = b"key1";
854 let mut transaction = db.transaction();
855 transaction.put(0, key1, b"horse");
856 db.write(transaction)?;
857
858 let config = DatabaseConfig {
859 secondary: TempfileBuilder::new().prefix("").tempdir()?.path().to_str().map(|s| s.to_string()),
860 ..DatabaseConfig::with_columns(1)
861 };
862 let second_db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
863 assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
864 Ok(())
865 }
866
867 #[test]
868 fn secondary_db_catch_up() -> io::Result<()> {
869 let primary = TempfileBuilder::new().prefix("").tempdir()?;
870 let config = DatabaseConfig::with_columns(1);
871 let db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
872
873 let config = DatabaseConfig {
874 secondary: TempfileBuilder::new().prefix("").tempdir()?.path().to_str().map(|s| s.to_string()),
875 ..DatabaseConfig::with_columns(1)
876 };
877 let second_db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
878
879 let mut transaction = db.transaction();
880 transaction.put(0, b"key1", b"mule");
881 transaction.put(0, b"key2", b"cat");
882 db.write(transaction)?;
883
884 second_db.try_catch_up_with_primary()?;
885 assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
886 Ok(())
887 }
888
889 #[test]
890 fn mem_tables_size() {
891 let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
892
893 let config = DatabaseConfig {
894 max_open_files: 512,
895 memory_budget: HashMap::new(),
896 compaction: CompactionProfile::default(),
897 columns: 11,
898 keep_log_file_num: 1,
899 enable_statistics: false,
900 secondary: None,
901 };
902
903 let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
904
905 let mut batch = db.transaction();
906 for i in 0u32..10000u32 {
907 batch.put(i / 1000 + 1, &i.to_le_bytes(), &(i * 17).to_le_bytes());
908 }
909 db.write(batch).unwrap();
910
911 {
912 let db = db.db.read();
913 db.as_ref().map(|db| {
914 assert!(db.static_property_or_warn(0, "rocksdb.cur-size-all-mem-tables") > 512);
915 });
916 }
917 }
918
919 #[test]
920 #[cfg(target_os = "linux")]
921 fn df_to_rotational() {
922 use std::path::PathBuf;
923 let example_df = vec![
925 70, 105, 108, 101, 115, 121, 115, 116, 101, 109, 32, 32, 32, 32, 32, 49, 75, 45, 98, 108, 111, 99, 107,
926 115, 32, 32, 32, 32, 32, 85, 115, 101, 100, 32, 65, 118, 97, 105, 108, 97, 98, 108, 101, 32, 85, 115, 101,
927 37, 32, 77, 111, 117, 110, 116, 101, 100, 32, 111, 110, 10, 47, 100, 101, 118, 47, 115, 100, 97, 49, 32,
928 32, 32, 32, 32, 32, 32, 54, 49, 52, 48, 57, 51, 48, 48, 32, 51, 56, 56, 50, 50, 50, 51, 54, 32, 32, 49, 57,
929 52, 52, 52, 54, 49, 54, 32, 32, 54, 55, 37, 32, 47, 10,
930 ];
931 let expected_output = Some(PathBuf::from("/sys/block/sda/queue/rotational"));
932 assert_eq!(rotational_from_df_output(example_df), expected_output);
933 }
934
935 #[test]
936 #[should_panic]
937 fn db_config_with_zero_columns() {
938 let _cfg = DatabaseConfig::with_columns(0);
939 }
940
941 #[test]
942 #[should_panic]
943 fn open_db_with_zero_columns() {
944 let cfg = DatabaseConfig { columns: 0, ..Default::default() };
945 let _db = Database::open(&cfg, "");
946 }
947
948 #[test]
949 fn add_columns() {
950 let config_1 = DatabaseConfig::default();
951 let config_5 = DatabaseConfig::with_columns(5);
952
953 let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
954
955 {
957 let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
958 assert_eq!(db.num_columns(), 1);
959
960 for i in 2..=5 {
961 db.add_column().unwrap();
962 assert_eq!(db.num_columns(), i);
963 }
964 }
965
966 {
968 let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).unwrap();
969 assert_eq!(db.num_columns(), 5);
970 }
971 }
972
973 #[test]
974 fn remove_columns() {
975 let config_1 = DatabaseConfig::default();
976 let config_5 = DatabaseConfig::with_columns(5);
977
978 let tempdir = TempfileBuilder::new().prefix("drop_columns").tempdir().unwrap();
979
980 {
982 let db = Database::open(&config_5, tempdir.path().to_str().unwrap()).expect("open with 5 columns");
983 assert_eq!(db.num_columns(), 5);
984
985 for i in (1..5).rev() {
986 db.remove_last_column().unwrap();
987 assert_eq!(db.num_columns(), i);
988 }
989 }
990
991 {
993 let db = Database::open(&config_1, tempdir.path().to_str().unwrap()).unwrap();
994 assert_eq!(db.num_columns(), 1);
995 }
996 }
997
998 #[test]
999 fn test_num_keys() {
1000 let tempdir = TempfileBuilder::new().prefix("").tempdir().unwrap();
1001 let config = DatabaseConfig::with_columns(1);
1002 let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
1003
1004 assert_eq!(db.num_keys(0).unwrap(), 0, "database is empty after creation");
1005 let key1 = b"beef";
1006 let mut batch = db.transaction();
1007 batch.put(0, key1, key1);
1008 db.write(batch).unwrap();
1009 assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count");
1010 }
1011
1012 #[test]
1013 fn default_memory_budget() {
1014 let c = DatabaseConfig::default();
1015 assert_eq!(c.columns, 1);
1016 assert_eq!(c.memory_budget(), DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB, "total memory budget is default");
1017 assert_eq!(
1018 c.memory_budget_for_col(0),
1019 DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
1020 "total memory budget for column 0 is the default"
1021 );
1022 assert_eq!(
1023 c.memory_budget_for_col(999),
1024 DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB * MB,
1025 "total memory budget for any column is the default"
1026 );
1027 }
1028
1029 #[test]
1030 fn memory_budget() {
1031 let mut c = DatabaseConfig::with_columns(3);
1032 c.memory_budget = [(0, 10), (1, 15), (2, 20)].iter().cloned().collect();
1033 assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
1034 }
1035
1036 #[test]
1037 fn test_stats_parser() {
1038 let raw = r#"rocksdb.row.cache.hit COUNT : 1
1039rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15
1040"#;
1041 let stats = stats::parse_rocksdb_stats(raw);
1042 assert_eq!(stats["row.cache.hit"].count, 1);
1043 assert!(stats["row.cache.hit"].times.is_none());
1044 assert_eq!(stats["db.get.micros"].count, 0);
1045 let get_times = stats["db.get.micros"].times.unwrap();
1046 assert_eq!(get_times.sum, 15);
1047 assert_eq!(get_times.p50, 2.0);
1048 assert_eq!(get_times.p95, 3.0);
1049 assert_eq!(get_times.p99, 4.0);
1050 assert_eq!(get_times.p100, 5.0);
1051 }
1052
1053 #[test]
1054 fn rocksdb_settings() {
1055 const NUM_COLS: usize = 2;
1056 let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
1057 cfg.max_open_files = 123; cfg.compaction.block_size = 323232;
1059 cfg.compaction.initial_file_size = 102030;
1060 cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();
1061
1062 let db_path = TempfileBuilder::new().prefix("config_test").tempdir().expect("the OS can create tmp dirs");
1063 let db = Database::open(&cfg, db_path.path().to_str().unwrap()).expect("can open a db");
1064 let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap()))
1065 .expect("rocksdb creates a LOG file");
1066 let mut settings = String::new();
1067 let statistics = db.get_statistics();
1068 assert!(statistics.contains_key("block.cache.hit"));
1069
1070 rocksdb_log.read_to_string(&mut settings).unwrap();
1071 assert!(settings.contains("Options for column family [default]"), "no default col");
1073 assert!(settings.contains("Options for column family [col0]"), "no col0");
1074 assert!(settings.contains("Options for column family [col1]"), "no col1");
1075
1076 assert!(settings.contains("max_open_files: 123"));
1078
1079 assert!(settings.contains(" block_size: 323232"));
1081
1082 assert!(settings.contains("block_cache_options:\n capacity : 8388608"));
1084 let lru_size = (330 * MB) / 3;
1086 let needle = format!("block_cache_options:\n capacity : {}", lru_size);
1087 let lru = settings.match_indices(&needle).collect::<Vec<_>>().len();
1088 assert_eq!(lru, NUM_COLS);
1089
1090 let include_indexes = settings.matches("cache_index_and_filter_blocks: 1").collect::<Vec<_>>().len();
1092 assert_eq!(include_indexes, NUM_COLS);
1093 let pins = settings.matches("pin_l0_filter_and_index_blocks_in_cache: 1").collect::<Vec<_>>().len();
1095 assert_eq!(pins, NUM_COLS);
1096
1097 let l0_sizes = settings.matches("target_file_size_base: 102030").collect::<Vec<_>>().len();
1099 assert_eq!(l0_sizes, NUM_COLS);
1100 assert!(settings.contains("target_file_size_base: 67108864"));
1102
1103 let snappy_compression = settings.matches("Options.compression: Snappy").collect::<Vec<_>>().len();
1105 assert_eq!(snappy_compression, NUM_COLS + 1);
1107 let snappy_bottommost = settings.matches("Options.bottommost_compression: Disabled").collect::<Vec<_>>().len();
1109 assert_eq!(snappy_bottommost, NUM_COLS + 1);
1110
1111 let levels = settings.matches("Options.num_levels: 7").collect::<Vec<_>>().len();
1113 assert_eq!(levels, NUM_COLS + 1);
1114
1115 assert!(settings.contains("Options.use_fsync: 0"));
1117
1118 assert!(settings.contains("format_version: 5"));
1120 }
1121}