Skip to main content

mace/utils/
options.rs

1use std::{
2    ops::Deref,
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use crate::{
8    io,
9    utils::observe::{NoopObserver, Observer},
10};
11
12use super::OpCode;
13
14fn dir_parent_for_sync(path: &Path) -> Option<&Path> {
15    match path.parent() {
16        Some(parent) if parent.as_os_str().is_empty() => Some(Path::new(".")),
17        Some(parent) => Some(parent),
18        None => None,
19    }
20}
21
22fn create_dir_all_and_sync(path: &Path) -> std::io::Result<()> {
23    if path.exists() {
24        return Ok(());
25    }
26
27    let mut created = Vec::new();
28    let mut cursor = Some(path);
29    while let Some(dir) = cursor {
30        if dir.exists() {
31            break;
32        }
33        created.push(dir.to_path_buf());
34        cursor = dir.parent();
35    }
36
37    std::fs::create_dir_all(path)?;
38
39    created.reverse();
40    for dir in created {
41        if let Some(parent) = dir_parent_for_sync(&dir) {
42            io::sync_dir(parent)?;
43        }
44    }
45    Ok(())
46}
47
48/// Configuration options for the Mace storage engine.
49#[derive(Clone)]
50pub struct Options {
51    /// Force-sync data to disk for every wal/data write.
52    ///
53    /// The default value is `true` (use fsync or else use fdatasync). Turning it off may result in
54    /// data loss, while turning it on may reduce performance.
55    pub sync_on_write: bool,
56    /// Writer group count. Default is [`Self::CONCURRENT_WRITE`] and it must be in the range `[1, 128]`
57    ///
58    /// **Once set, it cannot be modified**
59    pub concurrent_write: u8,
60    /// Garbage collection cycle interval (milliseconds).
61    pub gc_timeout: u64,
62    /// Proactive page-checkpoint trigger interval (milliseconds).
63    ///
64    /// When a bucket has pending dirty pages but no foreground write reaches checkpoint thresholds,
65    /// the evictor triggers checkpoint near this interval to prevent WAL checkpoint stalling.
66    ///
67    /// Set to 0 to disable proactive triggering.
68    pub checkpoint_nudge_ms: u64,
69    /// Perform compaction when the garbage ratio exceeds this value, in the range `[0, 100]`
70    pub data_garbage_ratio: u32,
71    /// If true, compact immediately when [`Self::data_garbage_ratio`] is reached.
72    pub gc_eager: bool,
73    /// Size limit of a blob file. Default is [`Self::BLOB_FILE_SIZE`]
74    pub blob_file_size: usize,
75    /// Trigger blob GC when the garbage ratio exceeds this value, in the range `[0, 100]`
76    pub blob_garbage_ratio: usize,
77    /// At each blob GC cycle, pick the lowest-utilization [`Self::blob_gc_ratio`]% of blob files as candidates.
78    pub blob_gc_ratio: usize,
79    /// Whether this is temporary storage.
80    ///
81    /// If true, `db_root` will be removed on exit.
82    pub tmp_store: bool,
83    /// Directory where database files are stored.
84    pub(crate) db_root: PathBuf,
85    /// Directory where log files are stored.
86    ///
87    /// The default value is `db_root/log`.
88    pub log_root: PathBuf,
89    /// Shared logical-address cache capacity in bytes.
90    ///
91    /// This cache keeps file-loaded blob values and auxiliary history/sibling pages.
92    /// Resident tree pages and dirty pool pages are accounted elsewhere and are not inserted here.
93    /// Trimming is best-effort and happens in small rounds, so short-term overshoot is possible.
94    ///
95    /// Different subsystems may transiently hold refs to the same allocation.
96    pub lru_capacity: usize,
97    /// Bitmap-cache entry count for data and blob stats.
98    pub stat_mask_cache_count: usize,
99    /// Maximum number of open data-file handles cached concurrently, used for loading data pages.
100    pub data_handle_cache_capacity: usize,
101    /// Maximum number of open blob-file handles cached concurrently, used for loading blob pages.
102    pub blob_handle_cache_capacity: usize,
103    /// Size limit of a data file. Minimum is [`Self::DATA_FILE_SIZE`]
104    pub data_file_size: usize,
105    /// WAL ring buffer size. Must be greater than the page size and a power of two.
106    pub wal_buffer_size: usize,
107    /// Number of checkpoints a transaction can span (i.e., transaction length limit).
108    ///
109    /// If a transaction exceeds this limit, it is forcibly aborted.
110    pub max_ckpt_per_txn: usize,
111    /// WAL file size limit that triggers switching to a new WAL file, up to 2GB.
112    pub wal_file_size: u32,
113    /// If true, remove unused stable WAL files (never used in recovery).
114    ///
115    /// Default is `false`.
116    pub keep_stable_wal_file: bool,
117    /// If true, corrupted WAL is truncated during recovery; otherwise recovery panics.
118    ///
119    /// Default is true.
120    pub truncate_corrupted_wal: bool,
121    /// Observability callback. Default is no-op.
122    pub observer: Arc<dyn Observer>,
123}
124
125#[derive(Clone, Copy, Debug, PartialEq, Eq)]
126#[repr(C)]
127pub struct BucketOptions {
128    /// Per-bucket target resident bytes for mapped B+Tree pages.
129    pub cache_capacity: usize,
130    /// Percentage of items evicted per round. Range is `[10, 80]`, default is `20%`
131    pub cache_evict_pct: usize,
132    /// Per-bucket pool target bytes. Default is [`Self::POOL_CAP`]
133    pub pool_capacity: usize,
134    /// Maximum bytes a single checkpoint round should emit. Default is [`Self::CHECKPOINT_SIZE`]
135    pub checkpoint_size: usize,
136    /// For branch nodes, keys and indexes are always inlined. For leaf nodes, values smaller than
137    /// [`Self::MIN_INLINE_SIZE`] are always inlined, and values larger than [`Self::MAX_INLINE_SIZE`]
138    /// are stored as blobs. Default is [`Self::MIN_INLINE_SIZE`]
139    pub inline_size: usize,
140    /// Maximum number of elements in an SST (B+Tree node). Default is [`Self::MAX_SPLIT_ELEMS`]
141    pub split_elems: u16,
142    /// Threshold for consolidating delta chains. Range is `[16, Self::split_elems / 2]`
143    pub consolidate_threshold: u16,
144    /// Enable foreground write backpressure. Default is `true`
145    pub enable_backpressure: bool,
146    pub _padding: [u8; 3],
147}
148
149impl Default for BucketOptions {
150    fn default() -> Self {
151        Self::new()
152    }
153}
154
155impl BucketOptions {
156    pub const MIN_SPLIT_ELEMS: u16 = 64;
157    pub const MAX_SPLIT_ELEMS: u16 = 512;
158    pub const CACHE_CAP: usize = 1 << 30; // 1GB
159    pub const POOL_CAP: usize = 1 << 30; // 1GB
160    pub const CHECKPOINT_SIZE: usize = 256 << 20; // 256MB
161    pub const MIN_INLINE_SIZE: usize = 4096;
162    pub const MAX_INLINE_SIZE: usize = 16384;
163
164    pub fn new() -> Self {
165        Self {
166            cache_capacity: Self::CACHE_CAP,
167            cache_evict_pct: 20,
168            pool_capacity: Self::POOL_CAP,
169            checkpoint_size: Self::CHECKPOINT_SIZE,
170            consolidate_threshold: Self::MAX_SPLIT_ELEMS / 2,
171            inline_size: Self::MIN_INLINE_SIZE,
172            split_elems: Self::MAX_SPLIT_ELEMS,
173            enable_backpressure: true,
174            _padding: [0u8; 3],
175        }
176    }
177
178    pub fn validate(mut self) -> BucketOptions {
179        if self.checkpoint_size == 0 {
180            self.checkpoint_size = Self::CHECKPOINT_SIZE;
181        }
182        if self.pool_capacity == 0 {
183            self.pool_capacity = Self::POOL_CAP;
184        }
185        if self.cache_capacity == 0 {
186            self.cache_capacity = Self::CACHE_CAP;
187        }
188        if self.checkpoint_size > self.pool_capacity {
189            self.checkpoint_size = self.pool_capacity;
190        }
191        self.cache_evict_pct = self.cache_evict_pct.clamp(10, 80);
192        self.split_elems = self
193            .split_elems
194            .clamp(Self::MIN_SPLIT_ELEMS, Self::MAX_SPLIT_ELEMS);
195        self.consolidate_threshold = self.consolidate_threshold.clamp(16, self.split_elems / 2);
196        self.inline_size = self
197            .inline_size
198            .clamp(Self::MIN_INLINE_SIZE, Self::MAX_INLINE_SIZE);
199        self
200    }
201
202    pub(crate) fn max_delta_len(&self) -> usize {
203        self.split_elems as usize / 4
204    }
205}
206
207impl Options {
208    pub const CONCURRENT_WRITE: u8 = 16;
209    pub const MAX_CONCURRENT_WRITE: u8 = 128;
210    pub const DATA_FILE_SIZE: usize = 64 << 20; // 64MB
211    pub const BLOB_FILE_SIZE: usize = 256 << 20; // 256MB
212    pub const LRU_CAPACITY: usize = 256 << 20; // 256MB
213    // Assuming a MemData/BlobStat is 32 KB, 16,384 stats use ~512 MB of memory, which is reasonable.
214    pub const STAT_MASK_CACHE_CNT: usize = 16384;
215    pub const WAL_BUF_SZ: usize = 16 << 20; // 16MB
216    pub const WAL_FILE_SZ: usize = 64 << 20; // 64MB
217
218    pub(crate) const MAX_KEY_SIZE: usize = 64 << 10;
219    pub(crate) const MAX_KV_SIZE: usize = 1 << 30; // 1GB
220
221    /// Creates a new Options instance with default values and the given database root.
222    pub fn new<P: AsRef<Path>>(db_root: P) -> Self {
223        Self {
224            sync_on_write: true,
225            concurrent_write: Self::CONCURRENT_WRITE,
226            tmp_store: false,
227            gc_timeout: 60 * 1000,          // 1min
228            checkpoint_nudge_ms: 60 * 1000, // 1min
229            data_garbage_ratio: 20,         // 20%
230            gc_eager: true,
231            blob_file_size: Self::BLOB_FILE_SIZE,
232            blob_garbage_ratio: 50, // 50%
233            blob_gc_ratio: 25,      // 25%
234            db_root: db_root.as_ref().to_path_buf(),
235            log_root: db_root.as_ref().to_path_buf(),
236            lru_capacity: Self::LRU_CAPACITY,
237            stat_mask_cache_count: Self::STAT_MASK_CACHE_CNT,
238            data_handle_cache_capacity: 128,
239            blob_handle_cache_capacity: 128,
240            data_file_size: Self::DATA_FILE_SIZE,
241            wal_buffer_size: Self::WAL_BUF_SZ,
242            max_ckpt_per_txn: 1_000_000, // 1 million
243            wal_file_size: Self::WAL_FILE_SZ as u32,
244            keep_stable_wal_file: false,
245            truncate_corrupted_wal: true,
246            observer: Arc::new(NoopObserver),
247        }
248    }
249
250    /// Validates the options and returns a ParsedOptions instance.
251    pub fn validate(mut self) -> Result<ParsedOptions, OpCode> {
252        self.concurrent_write = self
253            .concurrent_write
254            .clamp(1, Self::MAX_CONCURRENT_WRITE)
255            .next_power_of_two();
256        if self.stat_mask_cache_count == 0 {
257            self.stat_mask_cache_count = Self::STAT_MASK_CACHE_CNT;
258        }
259        if self.lru_capacity == 0 {
260            self.lru_capacity = Self::LRU_CAPACITY;
261        }
262        if self.data_file_size == 0 {
263            self.data_file_size = Self::DATA_FILE_SIZE;
264        }
265        if self.blob_file_size == 0 {
266            self.blob_file_size = Self::BLOB_FILE_SIZE;
267        }
268
269        self.create_dir().map_err(|e| {
270            eprintln!("create dir fail {e:?}");
271            OpCode::IoError
272        })?;
273        Ok(ParsedOptions { inner: self })
274    }
275
276    /// Creates the directory structure for the database.
277    pub fn create_dir(&self) -> std::io::Result<()> {
278        let (db_root, data_root, log_root) = (self.db_root(), self.data_root(), self.log_root());
279
280        if !db_root.exists() {
281            create_dir_all_and_sync(&db_root)?;
282        }
283        if !data_root.exists() {
284            create_dir_all_and_sync(&data_root)?;
285        }
286        if !log_root.exists() {
287            create_dir_all_and_sync(&log_root)?;
288        }
289        Ok(())
290    }
291}
292
293pub struct ParsedOptions {
294    pub(crate) inner: Options,
295}
296
297impl Deref for ParsedOptions {
298    type Target = Options;
299    fn deref(&self) -> &Self::Target {
300        &self.inner
301    }
302}
303
304impl Options {
305    pub const SEP: &'static str = "_";
306    pub const DATA_PREFIX: &'static str = "data";
307    pub const BLOB_PREFIX: &'static str = "blob";
308    pub const WAL_PREFIX: &'static str = "wal";
309    pub const WAL_STABLE: &'static str = "stable-wal";
310    pub const MANIFEST: &'static str = "manifest";
311
312    pub fn data_root(&self) -> PathBuf {
313        self.db_root().join("data")
314    }
315
316    pub fn data_file(&self, id: u64) -> PathBuf {
317        self.data_root()
318            .join(format!("{}{}{}", Self::DATA_PREFIX, Self::SEP, id))
319    }
320
321    pub fn blob_file(&self, id: u64) -> PathBuf {
322        self.data_root()
323            .join(format!("{}{}{}", Self::BLOB_PREFIX, Self::SEP, id))
324    }
325
326    pub fn log_root(&self) -> PathBuf {
327        if self.log_root == self.db_root {
328            self.db_root.join("log")
329        } else {
330            self.log_root.clone()
331        }
332    }
333
334    pub fn db_root(&self) -> PathBuf {
335        self.db_root.clone()
336    }
337
338    pub fn wal_file(&self, group_id: u8, seq: u64) -> PathBuf {
339        self.log_root().join(format!(
340            "{}{}{}{}{}",
341            Self::WAL_PREFIX,
342            Self::SEP,
343            group_id,
344            Self::SEP,
345            seq
346        ))
347    }
348
349    pub fn wal_backup(&self, group_id: u8, seq: u64) -> PathBuf {
350        self.log_root().join(format!(
351            "{}{}{}{}{}",
352            Self::WAL_STABLE,
353            Self::SEP,
354            group_id,
355            Self::SEP,
356            seq
357        ))
358    }
359
360    pub fn manifest(&self) -> PathBuf {
361        self.log_root().join(Self::MANIFEST)
362    }
363
364    pub(crate) fn sync_data_dir(&self) {
365        io::sync_dir(self.data_root()).unwrap_or_else(|e| panic!("can't fail, {:?}", e));
366    }
367
368    pub(crate) fn sync_log_dir(&self) {
369        io::sync_dir(self.log_root()).unwrap_or_else(|e| panic!("can't fail, {:?}", e));
370    }
371}
372
373impl Drop for ParsedOptions {
374    fn drop(&mut self) {
375        if self.inner.tmp_store {
376            log::info!("remove db_root {:?}", self.inner.db_root);
377            let _ = std::fs::remove_dir_all(&self.inner.db_root);
378        }
379    }
380}