1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
use crate::{journal::shard::RecoveryMode, Keyspace};
use lsm_tree::{descriptor_table::FileDescriptorTable, BlockCache};
use path_absolutize::Absolutize;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
fn absolute_path<P: AsRef<Path>>(path: P) -> PathBuf {
// TODO: replace with https://doc.rust-lang.org/std/path/fn.absolute.html once stable
path.as_ref()
.absolutize()
.expect("should be absolute path")
.into()
}
/// Global keyspace configuration
#[derive(Clone)]
pub struct Config {
/// Base path of database
pub(crate) path: PathBuf,
/// Block cache that will be shared between partitions
pub(crate) block_cache: Arc<BlockCache>,
/// Descriptor table that will be shared between partitions
pub(crate) descriptor_table: Arc<FileDescriptorTable>,
/// Max size of all journals in bytes
pub(crate) max_journaling_size_in_bytes: u64, // TODO: should be configurable during runtime: AtomicU64
/// Max size of all active memtables
///
/// This can be used to cap the memory usage if there are
/// many (possibly inactive) partitions.
pub(crate) max_write_buffer_size_in_bytes: u64, // TODO: should be configurable during runtime: AtomicU64
/// Amount of concurrent flush workers
pub(crate) flush_workers_count: usize,
/// Amount of compaction workers
pub(crate) compaction_workers_count: usize,
/// Fsync every N ms asynchronously
pub(crate) fsync_ms: Option<u16>,
pub(crate) journal_recovery_mode: RecoveryMode,
}
const DEFAULT_CPU_CORES: usize = 4;
impl Default for Config {
fn default() -> Self {
let queried_cores = std::thread::available_parallelism().map(usize::from);
// Reserve 1 CPU core if possible
let cpus = (queried_cores.unwrap_or(DEFAULT_CPU_CORES) - 1)
// Should never be 0
.max(1);
Self {
path: absolute_path (".fjall_data"),
block_cache: Arc::new(BlockCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
descriptor_table: Arc::new(FileDescriptorTable::new(960, 2)),
max_write_buffer_size_in_bytes: 64 * 1_024 * 1_024,
max_journaling_size_in_bytes: /* 512 MiB */ 512 * 1_024 * 1_024,
fsync_ms: Some(1_000),
flush_workers_count: cpus,
compaction_workers_count: cpus,
journal_recovery_mode: RecoveryMode::default(),
}
}
}
impl Config {
/// Creates a new configuration
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self {
path: absolute_path(path),
..Default::default()
}
}
/// Sets the amount of flush workers
///
/// Default = # CPU cores
#[must_use]
pub fn flush_workers(mut self, n: usize) -> Self {
self.flush_workers_count = n;
self
}
/// Sets the amount of compaction workers
///
/// Default = # CPU cores
#[must_use]
pub fn compaction_workers(mut self, n: usize) -> Self {
self.compaction_workers_count = n;
self
}
/// Sets the upper limit for open file descriptors.
///
/// Default = 960
///
/// # Panics
///
/// Panics if n < 2.
#[must_use]
pub fn max_open_files(mut self, n: usize) -> Self {
assert!(n >= 2);
self.descriptor_table = Arc::new(FileDescriptorTable::new(n, 2));
self
}
/// Sets the block cache.
///
/// Defaults to a block cache with 16 MiB of capacity
/// shared between all partitions inside this keyspace.
#[must_use]
pub fn block_cache(mut self, block_cache: Arc<BlockCache>) -> Self {
self.block_cache = block_cache;
self
}
/// Max size of all journals in bytes.
///
/// Default = 512 MiB
///
/// # Panics
///
/// Panics if bytes < 24 MiB.
///
/// This option should be at least 24 MiB, as one journal takes up at least 16 MiB, so
/// anything less will immediately stall the system.
///
/// Same as `max_total_wal_size` in `RocksDB`.
#[must_use]
pub fn max_journaling_size(mut self, bytes: u64) -> Self {
assert!(bytes >= 24 * 1_024 * 1_024);
self.max_journaling_size_in_bytes = bytes;
self
}
/// Max size of all memtables in bytes.
///
/// Similar to `db_write_buffer_size` in `RocksDB`.
///
/// Default = 64 MiB
///
/// # Panics
///
/// Panics if bytes < 1 MiB.
#[must_use]
pub fn max_write_buffer_size(mut self, bytes: u64) -> Self {
assert!(bytes >= 1_024 * 1_024);
self.max_write_buffer_size_in_bytes = bytes;
self
}
/// If Some, starts an fsync thread that asynchronously
/// persists data.
///
/// Default = 1 second
///
/// # Panics
///
/// Panics if ms is 0
#[must_use]
pub fn fsync_ms(mut self, ms: Option<u16>) -> Self {
if let Some(ms) = ms {
assert!(ms > 0);
}
self.fsync_ms = ms;
self
}
/// Opens a keyspace using the config.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn open(self) -> crate::Result<Keyspace> {
Keyspace::open(self)
}
/// Opens a transactional keyspace using the config.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
#[cfg(feature = "single_writer_tx")]
pub fn open_transactional(self) -> crate::Result<crate::TxKeyspace> {
crate::TxKeyspace::open(self)
}
}