1use std::{
2 ops::Deref,
3 path::{Path, PathBuf},
4 sync::Arc,
5};
6
7use crate::{
8 io,
9 utils::observe::{NoopObserver, Observer},
10};
11
12use super::OpCode;
13
14fn dir_parent_for_sync(path: &Path) -> Option<&Path> {
15 match path.parent() {
16 Some(parent) if parent.as_os_str().is_empty() => Some(Path::new(".")),
17 Some(parent) => Some(parent),
18 None => None,
19 }
20}
21
22fn create_dir_all_and_sync(path: &Path) -> std::io::Result<()> {
23 if path.exists() {
24 return Ok(());
25 }
26
27 let mut created = Vec::new();
28 let mut cursor = Some(path);
29 while let Some(dir) = cursor {
30 if dir.exists() {
31 break;
32 }
33 created.push(dir.to_path_buf());
34 cursor = dir.parent();
35 }
36
37 std::fs::create_dir_all(path)?;
38
39 created.reverse();
40 for dir in created {
41 if let Some(parent) = dir_parent_for_sync(&dir) {
42 io::sync_dir(parent)?;
43 }
44 }
45 Ok(())
46}
47
48#[derive(Clone)]
50pub struct Options {
51 pub sync_on_write: bool,
56 pub concurrent_write: u8,
60 pub gc_timeout: u64,
62 pub checkpoint_nudge_ms: u64,
69 pub data_garbage_ratio: u32,
71 pub gc_eager: bool,
73 pub blob_file_size: usize,
75 pub blob_garbage_ratio: usize,
77 pub blob_gc_ratio: usize,
79 pub tmp_store: bool,
83 pub(crate) db_root: PathBuf,
85 pub log_root: PathBuf,
89 pub lru_capacity: usize,
97 pub stat_mask_cache_count: usize,
99 pub data_handle_cache_capacity: usize,
101 pub blob_handle_cache_capacity: usize,
103 pub data_file_size: usize,
105 pub wal_buffer_size: usize,
107 pub max_ckpt_per_txn: usize,
111 pub wal_file_size: u32,
113 pub keep_stable_wal_file: bool,
117 pub truncate_corrupted_wal: bool,
121 pub observer: Arc<dyn Observer>,
123}
124
125#[derive(Clone, Copy, Debug, PartialEq, Eq)]
126#[repr(C)]
127pub struct BucketOptions {
128 pub cache_capacity: usize,
130 pub cache_evict_pct: usize,
132 pub pool_capacity: usize,
134 pub checkpoint_size: usize,
136 pub inline_size: usize,
140 pub split_elems: u16,
142 pub consolidate_threshold: u16,
144 pub enable_backpressure: bool,
146 pub _padding: [u8; 3],
147}
148
149impl Default for BucketOptions {
150 fn default() -> Self {
151 Self::new()
152 }
153}
154
155impl BucketOptions {
156 pub const MIN_SPLIT_ELEMS: u16 = 64;
157 pub const MAX_SPLIT_ELEMS: u16 = 512;
158 pub const CACHE_CAP: usize = 1 << 30; pub const POOL_CAP: usize = 1 << 30; pub const CHECKPOINT_SIZE: usize = 256 << 20; pub const MIN_INLINE_SIZE: usize = 4096;
162 pub const MAX_INLINE_SIZE: usize = 16384;
163
164 pub fn new() -> Self {
165 Self {
166 cache_capacity: Self::CACHE_CAP,
167 cache_evict_pct: 20,
168 pool_capacity: Self::POOL_CAP,
169 checkpoint_size: Self::CHECKPOINT_SIZE,
170 consolidate_threshold: Self::MAX_SPLIT_ELEMS / 2,
171 inline_size: Self::MIN_INLINE_SIZE,
172 split_elems: Self::MAX_SPLIT_ELEMS,
173 enable_backpressure: true,
174 _padding: [0u8; 3],
175 }
176 }
177
178 pub fn validate(mut self) -> BucketOptions {
179 if self.checkpoint_size == 0 {
180 self.checkpoint_size = Self::CHECKPOINT_SIZE;
181 }
182 if self.pool_capacity == 0 {
183 self.pool_capacity = Self::POOL_CAP;
184 }
185 if self.cache_capacity == 0 {
186 self.cache_capacity = Self::CACHE_CAP;
187 }
188 if self.checkpoint_size > self.pool_capacity {
189 self.checkpoint_size = self.pool_capacity;
190 }
191 self.cache_evict_pct = self.cache_evict_pct.clamp(10, 80);
192 self.split_elems = self
193 .split_elems
194 .clamp(Self::MIN_SPLIT_ELEMS, Self::MAX_SPLIT_ELEMS);
195 self.consolidate_threshold = self.consolidate_threshold.clamp(16, self.split_elems / 2);
196 self.inline_size = self
197 .inline_size
198 .clamp(Self::MIN_INLINE_SIZE, Self::MAX_INLINE_SIZE);
199 self
200 }
201
202 pub(crate) fn max_delta_len(&self) -> usize {
203 self.split_elems as usize / 4
204 }
205}
206
207impl Options {
208 pub const CONCURRENT_WRITE: u8 = 16;
209 pub const MAX_CONCURRENT_WRITE: u8 = 128;
210 pub const DATA_FILE_SIZE: usize = 64 << 20; pub const BLOB_FILE_SIZE: usize = 256 << 20; pub const LRU_CAPACITY: usize = 256 << 20; pub const STAT_MASK_CACHE_CNT: usize = 16384;
215 pub const WAL_BUF_SZ: usize = 16 << 20; pub const WAL_FILE_SZ: usize = 64 << 20; pub(crate) const MAX_KEY_SIZE: usize = 64 << 10;
219 pub(crate) const MAX_KV_SIZE: usize = 1 << 30; pub fn new<P: AsRef<Path>>(db_root: P) -> Self {
223 Self {
224 sync_on_write: true,
225 concurrent_write: Self::CONCURRENT_WRITE,
226 tmp_store: false,
227 gc_timeout: 60 * 1000, checkpoint_nudge_ms: 60 * 1000, data_garbage_ratio: 20, gc_eager: true,
231 blob_file_size: Self::BLOB_FILE_SIZE,
232 blob_garbage_ratio: 50, blob_gc_ratio: 25, db_root: db_root.as_ref().to_path_buf(),
235 log_root: db_root.as_ref().to_path_buf(),
236 lru_capacity: Self::LRU_CAPACITY,
237 stat_mask_cache_count: Self::STAT_MASK_CACHE_CNT,
238 data_handle_cache_capacity: 128,
239 blob_handle_cache_capacity: 128,
240 data_file_size: Self::DATA_FILE_SIZE,
241 wal_buffer_size: Self::WAL_BUF_SZ,
242 max_ckpt_per_txn: 1_000_000, wal_file_size: Self::WAL_FILE_SZ as u32,
244 keep_stable_wal_file: false,
245 truncate_corrupted_wal: true,
246 observer: Arc::new(NoopObserver),
247 }
248 }
249
250 pub fn validate(mut self) -> Result<ParsedOptions, OpCode> {
252 self.concurrent_write = self
253 .concurrent_write
254 .clamp(1, Self::MAX_CONCURRENT_WRITE)
255 .next_power_of_two();
256 if self.stat_mask_cache_count == 0 {
257 self.stat_mask_cache_count = Self::STAT_MASK_CACHE_CNT;
258 }
259 if self.lru_capacity == 0 {
260 self.lru_capacity = Self::LRU_CAPACITY;
261 }
262 if self.data_file_size == 0 {
263 self.data_file_size = Self::DATA_FILE_SIZE;
264 }
265 if self.blob_file_size == 0 {
266 self.blob_file_size = Self::BLOB_FILE_SIZE;
267 }
268
269 self.create_dir().map_err(|e| {
270 eprintln!("create dir fail {e:?}");
271 OpCode::IoError
272 })?;
273 Ok(ParsedOptions { inner: self })
274 }
275
276 pub fn create_dir(&self) -> std::io::Result<()> {
278 let (db_root, data_root, log_root) = (self.db_root(), self.data_root(), self.log_root());
279
280 if !db_root.exists() {
281 create_dir_all_and_sync(&db_root)?;
282 }
283 if !data_root.exists() {
284 create_dir_all_and_sync(&data_root)?;
285 }
286 if !log_root.exists() {
287 create_dir_all_and_sync(&log_root)?;
288 }
289 Ok(())
290 }
291}
292
293pub struct ParsedOptions {
294 pub(crate) inner: Options,
295}
296
297impl Deref for ParsedOptions {
298 type Target = Options;
299 fn deref(&self) -> &Self::Target {
300 &self.inner
301 }
302}
303
304impl Options {
305 pub const SEP: &'static str = "_";
306 pub const DATA_PREFIX: &'static str = "data";
307 pub const BLOB_PREFIX: &'static str = "blob";
308 pub const WAL_PREFIX: &'static str = "wal";
309 pub const WAL_STABLE: &'static str = "stable-wal";
310 pub const MANIFEST: &'static str = "manifest";
311
312 pub fn data_root(&self) -> PathBuf {
313 self.db_root().join("data")
314 }
315
316 pub fn data_file(&self, id: u64) -> PathBuf {
317 self.data_root()
318 .join(format!("{}{}{}", Self::DATA_PREFIX, Self::SEP, id))
319 }
320
321 pub fn blob_file(&self, id: u64) -> PathBuf {
322 self.data_root()
323 .join(format!("{}{}{}", Self::BLOB_PREFIX, Self::SEP, id))
324 }
325
326 pub fn log_root(&self) -> PathBuf {
327 if self.log_root == self.db_root {
328 self.db_root.join("log")
329 } else {
330 self.log_root.clone()
331 }
332 }
333
334 pub fn db_root(&self) -> PathBuf {
335 self.db_root.clone()
336 }
337
338 pub fn wal_file(&self, group_id: u8, seq: u64) -> PathBuf {
339 self.log_root().join(format!(
340 "{}{}{}{}{}",
341 Self::WAL_PREFIX,
342 Self::SEP,
343 group_id,
344 Self::SEP,
345 seq
346 ))
347 }
348
349 pub fn wal_backup(&self, group_id: u8, seq: u64) -> PathBuf {
350 self.log_root().join(format!(
351 "{}{}{}{}{}",
352 Self::WAL_STABLE,
353 Self::SEP,
354 group_id,
355 Self::SEP,
356 seq
357 ))
358 }
359
360 pub fn manifest(&self) -> PathBuf {
361 self.log_root().join(Self::MANIFEST)
362 }
363
364 pub(crate) fn sync_data_dir(&self) {
365 io::sync_dir(self.data_root()).unwrap_or_else(|e| panic!("can't fail, {:?}", e));
366 }
367
368 pub(crate) fn sync_log_dir(&self) {
369 io::sync_dir(self.log_root()).unwrap_or_else(|e| panic!("can't fail, {:?}", e));
370 }
371}
372
373impl Drop for ParsedOptions {
374 fn drop(&mut self) {
375 if self.inner.tmp_store {
376 log::info!("remove db_root {:?}", self.inner.db_root);
377 let _ = std::fs::remove_dir_all(&self.inner.db_root);
378 }
379 }
380}