fjall/config.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)
use crate::{journal::error::RecoveryMode, path::absolute_path, Keyspace};
use lsm_tree::{descriptor_table::FileDescriptorTable, BlobCache, BlockCache};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
/// Global keyspace configuration
#[derive(Clone)]
pub struct Config {
/// Base path of database
pub(crate) path: PathBuf,
/// When true, the path will be deleted upon drop
pub(crate) clean_path_on_drop: bool,
/// Block cache that will be shared between partitions
#[doc(hidden)]
pub block_cache: Arc<BlockCache>,
/// Blob cache that will be shared between partitions
#[doc(hidden)]
pub blob_cache: Arc<BlobCache>,
/// Descriptor table that will be shared between partitions
pub(crate) descriptor_table: Arc<FileDescriptorTable>,
/// Max size of all journals in bytes
pub(crate) max_journaling_size_in_bytes: u64, // TODO: should be configurable during runtime: AtomicU64
/// Max size of all active memtables
///
/// This can be used to cap the memory usage if there are
/// many (possibly inactive) partitions.
pub(crate) max_write_buffer_size_in_bytes: u64, // TODO: should be configurable during runtime: AtomicU64
pub(crate) manual_journal_persist: bool,
/// Amount of concurrent flush workers
pub(crate) flush_workers_count: usize,
/// Amount of compaction workers
pub(crate) compaction_workers_count: usize,
/// Fsync every N ms asynchronously
pub(crate) fsync_ms: Option<u16>,
pub(crate) journal_recovery_mode: RecoveryMode,
}
const DEFAULT_CPU_CORES: usize = 4;
fn get_open_file_limit() -> usize {
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
return 900;
#[cfg(target_os = "windows")]
return 400;
#[cfg(target_os = "macos")]
return 150;
}
impl Default for Config {
fn default() -> Self {
let queried_cores = std::thread::available_parallelism().map(usize::from);
// Reserve 1 CPU core if possible
let cpus = (queried_cores.unwrap_or(DEFAULT_CPU_CORES) - 1)
// Should never be 0
.max(1);
Self {
path: absolute_path(".fjall_data"),
clean_path_on_drop: false,
block_cache: Arc::new(BlockCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
blob_cache: Arc::new(BlobCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
descriptor_table: Arc::new(FileDescriptorTable::new(get_open_file_limit(), 4)),
max_write_buffer_size_in_bytes: /* 64 MiB */ 64 * 1_024 * 1_024,
max_journaling_size_in_bytes: /* 512 MiB */ 512 * 1_024 * 1_024,
fsync_ms: None,
flush_workers_count: cpus.min(4),
compaction_workers_count: cpus.min(4),
journal_recovery_mode: RecoveryMode::default(),
manual_journal_persist: false,
}
}
}
impl Config {
/// Creates a new configuration
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self {
path: absolute_path(path),
..Default::default()
}
}
/// If `false`, write batches or transactions automatically flush data to the operating system.
///
/// Default = false
///
/// Set to `true` to handle persistence manually, e.g. manually using `PersistMode::SyncData` for ACID transactions.
#[must_use]
pub fn manual_journal_persist(mut self, flag: bool) -> Self {
self.manual_journal_persist = flag;
self
}
/// Sets the amount of flush workers
///
/// Default = # CPU cores
#[must_use]
pub fn flush_workers(mut self, n: usize) -> Self {
self.flush_workers_count = n;
self
}
/// Sets the amount of compaction workers
///
/// Default = # CPU cores
#[must_use]
pub fn compaction_workers(mut self, n: usize) -> Self {
self.compaction_workers_count = n;
self
}
/// Sets the upper limit for open file descriptors.
///
/// # Panics
///
/// Panics if n < 2.
#[must_use]
pub fn max_open_files(mut self, n: usize) -> Self {
assert!(n >= 2);
self.descriptor_table = Arc::new(FileDescriptorTable::new(n, 2));
self
}
/// Sets the block cache.
///
/// Defaults to a block cache with 16 MiB of capacity
/// shared between all partitions inside this keyspace.
#[must_use]
pub fn block_cache(mut self, block_cache: Arc<BlockCache>) -> Self {
self.block_cache = block_cache;
self
}
/// Sets the blob cache.
///
/// Defaults to a block cache with 16 MiB of capacity
/// shared between all partitions inside this keyspace.
#[must_use]
pub fn blob_cache(mut self, blob_cache: Arc<BlobCache>) -> Self {
self.blob_cache = blob_cache;
self
}
/// Max size of all journals in bytes.
///
/// Default = 512 MiB
///
/// # Panics
///
/// Panics if bytes < 24 MiB.
///
/// This option should be at least 24 MiB, as one journal takes up at least 16 MiB, so
/// anything less will immediately stall the system.
///
/// Same as `max_total_wal_size` in `RocksDB`.
#[must_use]
pub fn max_journaling_size(mut self, bytes: u64) -> Self {
assert!(bytes >= 24 * 1_024 * 1_024);
self.max_journaling_size_in_bytes = bytes;
self
}
/// Max size of all memtables in bytes.
///
/// Similar to `db_write_buffer_size` in `RocksDB`, however it is disabled by default in `RocksDB`.
///
/// Set to `u64::MAX` to disable it.
///
/// Default = 64 MiB
///
/// # Panics
///
/// Panics if bytes < 1 MiB.
#[must_use]
pub fn max_write_buffer_size(mut self, bytes: u64) -> Self {
assert!(bytes >= 1_024 * 1_024);
self.max_write_buffer_size_in_bytes = bytes;
self
}
/// If Some, starts an fsync thread that asynchronously
/// persists data.
///
/// Default = 1 second
///
/// # Panics
///
/// Panics if ms is 0
#[must_use]
pub fn fsync_ms(mut self, ms: Option<u16>) -> Self {
if let Some(ms) = ms {
assert!(ms > 0);
}
self.fsync_ms = ms;
self
}
/// Opens a keyspace using the config.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn open(self) -> crate::Result<Keyspace> {
Keyspace::open(self)
}
/// Opens a transactional keyspace using the config.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
#[cfg(feature = "single_writer_tx")]
pub fn open_transactional(self) -> crate::Result<crate::TxKeyspace> {
crate::TxKeyspace::open(self)
}
/// Sets the `Keyspace` to clean upon drop.
///
/// # Examples
///
/// ```
/// # use fjall::{Config, PersistMode, Keyspace, PartitionCreateOptions};
/// # let folder = tempfile::tempdir()?.into_path();
/// let keyspace = Config::new(&folder).temporary(true).open()?;
///
/// assert!(folder.try_exists()?);
/// drop(keyspace);
/// assert!(!folder.try_exists()?);
/// #
/// # Ok::<_, fjall::Error>(())
/// ```
#[must_use]
pub fn temporary(mut self, flag: bool) -> Self {
self.clean_path_on_drop = flag;
self
}
}