fjall/
config.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{journal::error::RecoveryMode, path::absolute_path, Keyspace};
6use lsm_tree::{descriptor_table::FileDescriptorTable, Cache};
7use std::{
8    path::{Path, PathBuf},
9    sync::Arc,
10};
11
12/// Global keyspace configuration
13#[derive(Clone)]
14pub struct Config {
15    /// Base path of database
16    pub(crate) path: PathBuf,
17
18    /// When true, the path will be deleted upon drop
19    pub(crate) clean_path_on_drop: bool,
20
21    pub(crate) cache: Arc<Cache>,
22
23    // TODO: remove in V3
24    monkey_patch_cache_size: u64,
25
26    /// Descriptor table that will be shared between partitions
27    pub(crate) descriptor_table: Arc<FileDescriptorTable>,
28
29    /// Max size of all journals in bytes
30    pub(crate) max_journaling_size_in_bytes: u64, // TODO: should be configurable during runtime: AtomicU64
31
32    /// Max size of all active memtables
33    ///
34    /// This can be used to cap the memory usage if there are
35    /// many (possibly inactive) partitions.
36    pub(crate) max_write_buffer_size_in_bytes: u64, // TODO: should be configurable during runtime: AtomicU64
37
38    pub(crate) manual_journal_persist: bool,
39
40    /// Amount of concurrent flush workers
41    pub(crate) flush_workers_count: usize,
42
43    /// Amount of compaction workers
44    pub(crate) compaction_workers_count: usize,
45
46    /// Fsync every N ms asynchronously
47    pub(crate) fsync_ms: Option<u16>,
48
49    pub(crate) journal_recovery_mode: RecoveryMode,
50}
51
52const DEFAULT_CPU_CORES: usize = 4;
53
54fn get_open_file_limit() -> usize {
55    #[cfg(not(any(target_os = "windows", target_os = "macos")))]
56    return 900;
57
58    #[cfg(target_os = "windows")]
59    return 400;
60
61    #[cfg(target_os = "macos")]
62    return 150;
63}
64
65impl Default for Config {
66    fn default() -> Self {
67        let queried_cores = std::thread::available_parallelism().map(usize::from);
68
69        // Reserve 1 CPU core if possible
70        let cpus = (queried_cores.unwrap_or(DEFAULT_CPU_CORES) - 1)
71            // Should never be 0
72            .max(1);
73
74        Self {
75            path: absolute_path(".fjall_data"),
76            clean_path_on_drop: false,
77            descriptor_table: Arc::new(FileDescriptorTable::new(get_open_file_limit(), 4)),
78            max_write_buffer_size_in_bytes: /* 64 MiB */ 64 * 1_024 * 1_024,
79            max_journaling_size_in_bytes: /* 512 MiB */ 512 * 1_024 * 1_024,
80            fsync_ms: None,
81            flush_workers_count: cpus.min(4),
82            compaction_workers_count: cpus.min(4),
83            journal_recovery_mode: RecoveryMode::default(),
84            manual_journal_persist: false,
85
86            cache: Arc::new(Cache::with_capacity_bytes(/* 32 MiB */ 32*1_024*1_024)),
87            monkey_patch_cache_size: 0,
88        }
89    }
90}
91
92impl Config {
93    /// Creates a new configuration
94    pub fn new<P: AsRef<Path>>(path: P) -> Self {
95        Self {
96            path: absolute_path(path),
97            ..Default::default()
98        }
99    }
100
101    /// If `false`, write batches or transactions automatically flush data to the operating system.
102    ///
103    /// Default = false
104    ///
105    /// Set to `true` to handle persistence manually, e.g. manually using `PersistMode::SyncData` for ACID transactions.
106    #[must_use]
107    pub fn manual_journal_persist(mut self, flag: bool) -> Self {
108        self.manual_journal_persist = flag;
109        self
110    }
111
112    /// Sets the amount of flush workers
113    ///
114    /// Default = # CPU cores
115    #[must_use]
116    pub fn flush_workers(mut self, n: usize) -> Self {
117        self.flush_workers_count = n;
118        self
119    }
120
121    /// Sets the amount of compaction workers
122    ///
123    /// Default = # CPU cores
124    #[must_use]
125    pub fn compaction_workers(mut self, n: usize) -> Self {
126        self.compaction_workers_count = n;
127        self
128    }
129
130    /// Sets the upper limit for open file descriptors.
131    ///
132    /// # Panics
133    ///
134    /// Panics if n < 2.
135    #[must_use]
136    pub fn max_open_files(mut self, n: usize) -> Self {
137        assert!(n >= 2);
138
139        self.descriptor_table = Arc::new(FileDescriptorTable::new(n, 2));
140        self
141    }
142
143    // TODO: remove in V3
144    /// Sets the block cache.
145    ///
146    /// Defaults to a block cache with 16 MiB of capacity
147    /// shared between all partitions inside this keyspace.
148    #[must_use]
149    #[deprecated = "Use Config::cache_size instead"]
150    #[allow(deprecated)]
151    pub fn block_cache(mut self, block_cache: Arc<crate::BlockCache>) -> Self {
152        self.monkey_patch_cache_size += block_cache.capacity();
153        self
154    }
155
156    // TODO: remove in V3
157    /// Sets the blob cache.
158    ///
159    /// Defaults to a block cache with 16 MiB of capacity
160    /// shared between all partitions inside this keyspace.
161    #[must_use]
162    #[deprecated = "Use Config::cache_size instead"]
163    #[allow(deprecated)]
164    pub fn blob_cache(mut self, blob_cache: Arc<crate::BlobCache>) -> Self {
165        self.monkey_patch_cache_size += blob_cache.capacity();
166        self
167    }
168
169    /// Sets the cache capacity in bytes.
170    #[must_use]
171    pub fn cache_size(mut self, size_bytes: u64) -> Self {
172        self.monkey_patch_cache_size = 0;
173        self.cache = Arc::new(Cache::with_capacity_bytes(size_bytes));
174        self
175    }
176
177    /// Max size of all journals in bytes.
178    ///
179    /// Default = 512 MiB
180    ///
181    /// # Panics
182    ///
183    /// Panics if bytes < 24 MiB.
184    ///
185    /// This option should be at least 24 MiB, as one journal takes up at least 16 MiB, so
186    /// anything less will immediately stall the system.
187    ///
188    /// Same as `max_total_wal_size` in `RocksDB`.
189    #[must_use]
190    pub fn max_journaling_size(mut self, bytes: u64) -> Self {
191        assert!(bytes >= 24 * 1_024 * 1_024);
192
193        self.max_journaling_size_in_bytes = bytes;
194        self
195    }
196
197    /// Max size of all memtables in bytes.
198    ///
199    /// Similar to `db_write_buffer_size` in `RocksDB`, however it is disabled by default in `RocksDB`.
200    ///
201    /// Set to `u64::MAX` to disable it.
202    ///
203    /// Default = 64 MiB
204    ///
205    /// # Panics
206    ///
207    /// Panics if bytes < 1 MiB.
208    #[must_use]
209    pub fn max_write_buffer_size(mut self, bytes: u64) -> Self {
210        assert!(bytes >= 1_024 * 1_024);
211
212        self.max_write_buffer_size_in_bytes = bytes;
213        self
214    }
215
216    /// If Some, starts an fsync thread that asynchronously
217    /// persists data to disk (using fsync).
218    ///
219    /// Default = off
220    ///
221    /// # Panics
222    ///
223    /// Panics if ms is 0.
224    #[must_use]
225    pub fn fsync_ms(mut self, ms: Option<u16>) -> Self {
226        if let Some(ms) = ms {
227            assert!(ms > 0);
228        }
229
230        self.fsync_ms = ms;
231        self
232    }
233
234    /// Opens a keyspace using the config.
235    ///
236    /// # Errors
237    ///
238    /// Will return `Err` if an IO error occurs.
239    pub fn open(mut self) -> crate::Result<Keyspace> {
240        // TODO: remove in V3
241        if self.monkey_patch_cache_size > 0 {
242            self.cache = Arc::new(Cache::with_capacity_bytes(self.monkey_patch_cache_size));
243        }
244        Keyspace::open(self)
245    }
246
247    /// Opens a transactional keyspace using the config.
248    ///
249    /// # Errors
250    ///
251    /// Will return `Err` if an IO error occurs.
252    #[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
253    pub fn open_transactional(mut self) -> crate::Result<crate::TxKeyspace> {
254        // TODO: remove in V3
255        if self.monkey_patch_cache_size > 0 {
256            self.cache = Arc::new(Cache::with_capacity_bytes(self.monkey_patch_cache_size));
257        }
258        crate::TxKeyspace::open(self)
259    }
260
261    /// Sets the `Keyspace` to clean upon drop.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// # use fjall::{Config, PersistMode, Keyspace, PartitionCreateOptions};
267    /// # let folder = tempfile::tempdir()?.into_path();
268    /// let keyspace = Config::new(&folder).temporary(true).open()?;
269    ///
270    /// assert!(folder.try_exists()?);
271    /// drop(keyspace);
272    /// assert!(!folder.try_exists()?);
273    /// #
274    /// # Ok::<_, fjall::Error>(())
275    /// ```
276    #[must_use]
277    pub fn temporary(mut self, flag: bool) -> Self {
278        self.clean_path_on_drop = flag;
279        self
280    }
281}