1use std::fmt::Debug;
2use std::fs::create_dir;
3use std::path::{Path, PathBuf};
4
5use biometrics::Collector;
6use mani::ManifestOptions;
7use setsum::Setsum;
8use sst::SstOptions;
9use sst::gc::GarbageCollectionPolicy;
10use sst::log::LogOptions;
11use zerror::{Z, iotoz};
12use zerror_core::ErrorCore;
13
14mod kvs;
15mod reference_counter;
16mod tree;
17mod verifier;
18
19pub use kvs::{KeyValueStore, WriteBatch};
20pub use tree::{CompactionID, LsmTree, NUM_LEVELS};
21pub use verifier::{LsmVerifier, ManifestVerifier};
22
23pub fn register_biometrics(collector: &Collector) {
26 tree::register_biometrics(collector);
27 verifier::register_biometrics(collector);
28}
29
30#[allow(non_snake_case)]
33pub fn MANI_ROOT<P: AsRef<Path>>(root: P) -> PathBuf {
34 root.as_ref().to_path_buf().join("mani")
35}
36
37#[allow(non_snake_case)]
38pub fn VERIFY_ROOT<P: AsRef<Path>>(root: P) -> PathBuf {
39 root.as_ref().to_path_buf().join("verify")
40}
41
42#[allow(non_snake_case)]
43pub fn SST_ROOT<P: AsRef<Path>>(root: P) -> PathBuf {
44 root.as_ref().to_path_buf().join("sst")
45}
46
47#[allow(non_snake_case)]
48pub fn SST_FILE<P: AsRef<Path>>(root: P, setsum: Setsum) -> PathBuf {
49 SST_ROOT(root).join(setsum.hexdigest() + ".sst")
50}
51
52#[allow(non_snake_case)]
53pub fn COMPACTION_ROOT<P: AsRef<Path>>(root: P) -> PathBuf {
54 root.as_ref().to_path_buf().join("compaction")
55}
56
57#[allow(non_snake_case)]
58pub fn COMPACTION_DIR<P: AsRef<Path>>(root: P, setsum: Setsum) -> PathBuf {
59 COMPACTION_ROOT(root).join(setsum.hexdigest())
60}
61
62#[allow(non_snake_case)]
63pub fn TRASH_ROOT<P: AsRef<Path>>(root: P) -> PathBuf {
64 root.as_ref().to_path_buf().join("trash")
65}
66
67#[allow(non_snake_case)]
68pub fn TRASH_SST<P: AsRef<Path>>(root: P, setsum: Setsum) -> PathBuf {
69 TRASH_ROOT(root).join(setsum.hexdigest() + ".sst")
70}
71
72#[allow(non_snake_case)]
73pub fn TRASH_LOG<P: AsRef<Path>>(root: P, number: u64) -> PathBuf {
74 TRASH_ROOT(root).join(format!("log.{number}"))
75}
76
77#[allow(non_snake_case)]
78pub fn INGEST_ROOT<P: AsRef<Path>>(root: P) -> PathBuf {
79 root.as_ref().to_path_buf().join("ingest")
80}
81
82#[allow(non_snake_case)]
83pub fn INGEST_FILE<P: AsRef<Path>>(root: P, setsum: Setsum) -> PathBuf {
84 INGEST_ROOT(root).join(setsum.hexdigest() + ".sst")
85}
86
87#[allow(non_snake_case)]
88pub fn TEMP_ROOT<P: AsRef<Path>>(root: P) -> PathBuf {
89 root.as_ref().to_path_buf().join("tmp")
90}
91
92#[allow(non_snake_case)]
93pub fn TEMP_FILE<P: AsRef<Path>>(root: P, setsum: Setsum) -> PathBuf {
94 TEMP_ROOT(root).join(setsum.hexdigest() + ".sst")
95}
96
97#[allow(non_snake_case)]
98pub fn LOG_FILE<P: AsRef<Path>>(root: P, number: u64) -> PathBuf {
99 root.as_ref().to_path_buf().join(format!("log.{number}"))
100}
101
102fn parse_log_file<P: AsRef<Path>>(path: P) -> Option<u64> {
103 if let Some(file_name) = path.as_ref().file_name() {
104 if file_name.to_string_lossy().as_ref() != file_name {
105 return None;
106 }
107 let file_name = file_name.to_string_lossy().to_string();
108 if !file_name.starts_with("log.") {
109 return None;
110 }
111 let number: u64 = match file_name[4..].parse() {
112 Ok(number) => number,
113 Err(_) => {
114 return None;
118 }
119 };
120 Some(number)
121 } else {
122 None
123 }
124}
125
126fn ensure_dir(path: PathBuf, commentary: &str) -> Result<(), Error> {
127 if !path.is_dir() {
128 Ok(create_dir(&path).as_z().with_info(commentary, path)?)
129 } else {
130 Ok(())
131 }
132}
133
134fn make_all_dirs<P: AsRef<Path>>(root: P) -> Result<(), Error> {
135 ensure_dir(VERIFY_ROOT(&root), "verify")?;
136 ensure_dir(SST_ROOT(&root), "sst")?;
137 ensure_dir(COMPACTION_ROOT(&root), "compaction")?;
138 ensure_dir(TRASH_ROOT(&root), "trash")?;
139 ensure_dir(INGEST_ROOT(&root), "ingest")?;
140 ensure_dir(TEMP_ROOT(&root), "tmp")?;
141 Ok(())
142}
143
144#[derive(Clone, zerror_derive::Z)]
147pub enum Error {
148 Success {
149 core: ErrorCore,
150 },
151 KeyTooLarge {
152 core: ErrorCore,
153 length: usize,
154 limit: usize,
155 },
156 ValueTooLarge {
157 core: ErrorCore,
158 length: usize,
159 limit: usize,
160 },
161 SortOrder {
162 core: ErrorCore,
163 last_key: Vec<u8>,
164 last_timestamp: u64,
165 new_key: Vec<u8>,
166 new_timestamp: u64,
167 },
168 TableFull {
169 core: ErrorCore,
170 size: usize,
171 limit: usize,
172 },
173 BlockTooSmall {
174 core: ErrorCore,
175 length: usize,
176 required: usize,
177 },
178 UnpackError {
179 core: ErrorCore,
180 error: prototk::Error,
181 context: String,
182 },
183 Crc32cFailure {
184 core: ErrorCore,
185 start: u64,
186 limit: u64,
187 crc32c: u32,
188 },
189 Corruption {
190 core: ErrorCore,
191 context: String,
192 },
193 LogicError {
194 core: ErrorCore,
195 context: String,
196 },
197 SystemError {
198 core: ErrorCore,
199 what: String,
200 },
201 TooManyOpenFiles {
202 core: ErrorCore,
203 limit: usize,
204 },
205 EmptyBatch {
206 core: ErrorCore,
207 },
208 DuplicateSst {
209 core: ErrorCore,
210 what: String,
211 },
212 SstNotFound {
213 core: ErrorCore,
214 setsum: String,
215 },
216 PathError {
217 core: ErrorCore,
218 path: PathBuf,
219 what: String,
220 },
221 ManifestError {
222 core: ErrorCore,
223 what: mani::Error,
224 },
225 ConcurrentCompaction {
226 core: ErrorCore,
227 setsum: String,
228 },
229 Backoff {
230 core: ErrorCore,
231 path: String,
232 },
233 Sst {
234 core: ErrorCore,
235 what: sst::Error,
236 },
237}
238
239impl From<std::io::Error> for Error {
240 fn from(what: std::io::Error) -> Error {
241 Error::SystemError {
242 core: ErrorCore::default(),
243 what: what.to_string(),
244 }
245 }
246}
247
248impl From<mani::Error> for Error {
249 fn from(what: mani::Error) -> Error {
250 Error::ManifestError {
251 core: ErrorCore::default(),
252 what,
253 }
254 }
255}
256
257impl From<sst::Error> for Error {
258 fn from(what: sst::Error) -> Error {
259 Error::Sst {
260 core: ErrorCore::default(),
261 what,
262 }
263 }
264}
265
266iotoz! {Error}
267
268#[derive(Clone, Debug, Eq, PartialEq)]
271#[cfg_attr(feature = "command_line", derive(arrrg_derive::CommandLine))]
272pub struct LsmtkOptions {
273 #[cfg_attr(feature = "command_line", arrrg(nested))]
274 mani: ManifestOptions,
275 #[cfg_attr(feature = "command_line", arrrg(nested))]
276 log: LogOptions,
277 #[cfg_attr(feature = "command_line", arrrg(nested))]
278 sst: SstOptions,
279 #[cfg_attr(
281 feature = "command_line",
282 arrrg(required, "Root path for the lsmtk", "PATH")
283 )]
284 path: String,
285 #[cfg_attr(
286 feature = "command_line",
287 arrrg(optional, "Maximum number of files to open", "FILES")
288 )]
289 max_open_files: usize,
290 #[cfg_attr(
291 feature = "command_line",
292 arrrg(optional, "Maximum number of bytes permitted in a compaction", "BYTES")
293 )]
294 max_compaction_bytes: usize,
295 #[cfg_attr(
296 feature = "command_line",
297 arrrg(optional, "Maximum number of files permitted in a compaction", "FILES")
298 )]
299 max_compaction_files: usize,
300 #[cfg_attr(
301 feature = "command_line",
302 arrrg(
303 optional,
304 "Maximum number of files permitted in L0 before compaction becomes mandatory.",
305 "FILES"
306 )
307 )]
308 l0_mandatory_compaction_threshold_files: usize,
309 #[cfg_attr(
310 feature = "command_line",
311 arrrg(
312 optional,
313 "Maximum number of bytes permitted in L0 before compaction becomes mandatory.",
314 "BYTES"
315 )
316 )]
317 l0_mandatory_compaction_threshold_bytes: usize,
318 #[cfg_attr(
319 feature = "command_line",
320 arrrg(
321 optional,
322 "Maximum number of files permitted in L0 before writes begin to stall.",
323 "FILES"
324 )
325 )]
326 l0_write_stall_threshold_files: usize,
327 #[cfg_attr(
328 feature = "command_line",
329 arrrg(
330 optional,
331 "Maximum number of bytes permitted in L0 before writes begin to stall.",
332 "BYTES"
333 )
334 )]
335 l0_write_stall_threshold_bytes: usize,
336 #[cfg_attr(
337 feature = "command_line",
338 arrrg(
339 optional,
340 "Maximum number of bytes to grow a memtable to before compacting into L0.",
341 "BYTES"
342 )
343 )]
344 memtable_size_bytes: usize,
345 #[cfg_attr(
346 feature = "command_line",
347 arrrg(
348 optional,
349 "Garbage collection policy as a string; only versions=X will collect at the moment.",
350 "POLICY"
351 )
352 )]
353 gc_policy: GarbageCollectionPolicy,
354 #[cfg_attr(
355 feature = "command_line",
356 arrrg(optional, "Number of bytes to use for the sst cache.", "BYTES")
357 )]
358 sst_cache_bytes: usize,
359}
360
361impl LsmtkOptions {
362 pub fn path(&self) -> &str {
363 &self.path
364 }
365}
366
367impl Default for LsmtkOptions {
368 fn default() -> Self {
369 Self {
370 mani: ManifestOptions::default(),
371 log: LogOptions::default(),
372 sst: SstOptions::default(),
373 path: "db".to_owned(),
374 max_open_files: 1 << 19,
375 max_compaction_bytes: 1 << 29,
376 max_compaction_files: 1 << 6,
377 l0_mandatory_compaction_threshold_files: 4,
378 l0_mandatory_compaction_threshold_bytes: 1 << 26,
379 l0_write_stall_threshold_files: 12,
380 l0_write_stall_threshold_bytes: 1 << 28,
381 memtable_size_bytes: 1 << 26,
382 gc_policy: GarbageCollectionPolicy::try_from("versions = 1").unwrap(),
383 sst_cache_bytes: 1 << 26,
384 }
385 }
386}
387
388pub static TRACING: indicio::Collector = indicio::Collector::new();
391
392#[cfg(test)]
395mod test_util {
396 use std::fs::remove_dir_all;
397 use std::sync::Mutex;
398
399 use super::*;
400
401 pub static SST_FOR_TEST_MUTEX: Mutex<()> = Mutex::new(());
402
403 pub fn test_root(root: &str, line: u32) -> String {
404 let root: String = root
405 .chars()
406 .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
407 .collect();
408 let path = PathBuf::from(format!("{root}_{line}"));
409 if path.exists() {
410 remove_dir_all(&path).expect("could not prepare for test");
411 }
412 String::from(path.to_string_lossy())
413 }
414
415 #[macro_export]
416 macro_rules! sst_for_test {
417 ($output_dir:ident: $($key:literal => $val:literal,)*) => {
418 {
419 let _mutex = test_util::SST_FOR_TEST_MUTEX.lock().unwrap();
420 let tmp = PathBuf::from("sst.tmp");
421 if tmp.exists() {
422 std::fs::remove_file(&tmp).as_z().pretty_unwrap();
423 }
424 std::fs::create_dir_all(&$output_dir).as_z().pretty_unwrap();
425 let options = sst::SstOptions::default();
426 let mut sst_builder = sst::SstBuilder::new(options, &tmp).as_z().pretty_unwrap();
427 $(sst_builder.put($key.as_bytes(), 1, $val.as_bytes()).as_z().pretty_unwrap();)*
428 let sst = sst_builder.seal().as_z().pretty_unwrap();
429 let setsum = sst.fast_setsum();
430 let output = PathBuf::from(&$output_dir).join(format!("{}.sst", setsum.hexdigest()));
431 std::fs::rename(&tmp, &output).as_z().pretty_unwrap();
432 output
433 }
434 };
435 }
436
437 #[macro_export]
438 macro_rules! sst_check {
439 ($test_root:expr; $relative_path:literal: $($key:literal => $val:literal,)*) => {
440 let sst_path = PathBuf::from($test_root).join($relative_path);
441 let sst = sst::Sst::<sst::file_manager::FileHandle>::new(SstOptions::default(), sst_path).as_z().pretty_unwrap();
442 let mut cursor = sst.cursor();
443 cursor.seek_to_first().as_z().pretty_unwrap();
444 $(
445 cursor.next().as_z().pretty_unwrap();
446 let kvr = cursor.key_value().expect("key-value pair should not be none");
447 assert_eq!($key.as_bytes(), kvr.key);
448 let value: &[u8] = $val.as_bytes();
449 assert_eq!(Some(value), kvr.value);
450 )*
451 cursor.next().as_z().pretty_unwrap();
452 assert_eq!(None, cursor.key_value());
453 };
454 }
455
456 #[macro_export]
457 macro_rules! log_for_test {
458 ($test_root:expr; $relative_path:literal: $($key:literal => $val:literal,)*) => {
459 {
460 let _mutex = test_util::SST_FOR_TEST_MUTEX.lock().unwrap();
461 let tmp = PathBuf::from("log.tmp");
462 if tmp.exists() {
463 std::fs::remove_file(&tmp).as_z().pretty_unwrap();
464 }
465 let output_file = PathBuf::from($test_root).join($relative_path);
466 std::fs::create_dir_all(&output_file.parent().map(PathBuf::from).unwrap_or(PathBuf::from("."))).as_z().pretty_unwrap();
467 let options = sst::LogOptions::default();
468 let mut log_builder = sst::LogBuilder::new(options, &tmp).as_z().pretty_unwrap();
469 $(log_builder.put($key.as_bytes(), 1, $val.as_bytes()).as_z().pretty_unwrap();)*
470 let setsum = log_builder.seal().as_z().pretty_unwrap().0;
471 std::fs::rename(&tmp, &output_file).as_z().pretty_unwrap();
472 setsum
473 }
474 };
475 }
476
477 #[macro_export]
478 macro_rules! log_check {
479 ($test_root:expr; $relative_path:literal: $($key:literal => $val:literal,)*) => {
480 let log_path = PathBuf::from($test_root).join($relative_path);
481 let mut log = sst::LogIterator::new(sst::LogOptions::default(), log_path).as_z().pretty_unwrap();
482 $(
483 let kvr = log.next().as_z().pretty_unwrap().expect("next should not be None");
484 assert_eq!($key.as_bytes(), kvr.key);
485 let value: &[u8] = $val.as_bytes();
486 assert_eq!(Some(value), kvr.value);
487 )*
488 assert_eq!(None, log.next().as_z().pretty_unwrap());
489 };
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use mani::{Edit, Manifest};
496 use sst::{Builder, Cursor};
497
498 use super::*;
499
500 #[test]
501 fn test_sst_for_test() {
502 let test_root = PathBuf::from(test_util::test_root(module_path!(), line!()));
503 let sst_root = test_root.join("sst");
504 let _output = sst_for_test! {
505 sst_root:
506 "key1" => "value1",
507 "key2" => "value2",
508 };
509 sst_check! {
510 &test_root; "sst/fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a.sst":
511 "key1" => "value1",
512 "key2" => "value2",
513 };
514 }
515
516 #[test]
517 fn test_log_for_test() {
518 let test_root = PathBuf::from(test_util::test_root(module_path!(), line!()));
519 let setsum = log_for_test! {
520 &test_root; "log.0":
521 "key2" => "value2",
522 "key1" => "value1",
523 };
524 assert_eq!(
525 "fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a",
526 setsum.hexdigest()
527 );
528 log_check! {
529 &test_root; "log.0":
530 "key2" => "value2",
531 "key1" => "value1",
532 };
533 }
534
535 #[test]
536 fn empty_kvs() {
537 let root = test_util::test_root(module_path!(), line!());
538 let options = LsmtkOptions {
539 path: root.clone(),
540 ..Default::default()
541 };
542 KeyValueStore::open(options).expect("key-value store should open");
543 }
544
545 #[test]
546 fn log_no_sst() {
547 let root = test_util::test_root(module_path!(), line!());
548 let options = LsmtkOptions {
549 path: root.clone(),
550 ..Default::default()
551 };
552 let _setsum = log_for_test! {
553 &root; "log.0":
554 "key2" => "value2",
555 "key1" => "value1",
556 };
557 KeyValueStore::open(options).as_z().pretty_unwrap();
558 sst_check! {
559 &root; "sst/fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a.sst":
560 "key1" => "value1",
561 "key2" => "value2",
562 };
563 assert!(!PathBuf::from(&root).join("log.0").exists());
564 }
566
567 #[test]
568 fn log_sst_no_manifest() {
569 let root = test_util::test_root(module_path!(), line!());
570 let options = LsmtkOptions {
571 path: root.clone(),
572 ..Default::default()
573 };
574 let _setsum = log_for_test! {
575 &root; "log.0":
576 "key2" => "value2",
577 "key1" => "value1",
578 };
579 let sst_root = SST_ROOT(&root);
580 let _path = sst_for_test! {
581 sst_root:
582 "key1" => "value1",
583 "key2" => "value2",
584 };
585 let _kvs = KeyValueStore::open(options).expect("key-value store should open");
586 sst_check! {
587 &root; "sst/fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a.sst":
588 "key1" => "value1",
589 "key2" => "value2",
590 };
591 assert!(!PathBuf::from(&root).join("log.0").exists());
592 }
594
595 #[test]
596 fn log_sst_manifest_no_rm() {
597 let root = test_util::test_root(module_path!(), line!());
598 let options = LsmtkOptions {
599 path: root.clone(),
600 ..Default::default()
601 };
602 let _setsum = log_for_test! {
603 &root; "log.0":
604 "key2" => "value2",
605 "key1" => "value1",
606 };
607 let sst_root = SST_ROOT(&root);
608 let _path = sst_for_test! {
609 sst_root:
610 "key1" => "value1",
611 "key2" => "value2",
612 };
613 let mut mani =
614 Manifest::open(options.mani.clone(), MANI_ROOT(&root)).expect("manifest should open");
615 let mut edit = Edit::default();
616 edit.add("fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a")
617 .expect("manifest edit should never fail");
618 edit.info(
619 'I',
620 "0000000000000000000000000000000000000000000000000000000000000000",
621 )
622 .expect("manifest info should never fail");
623 edit.info(
624 'O',
625 "fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a",
626 )
627 .expect("manifest info should never fail");
628 edit.info(
629 'D',
630 "006c171eacb7d291d0a7ff7725d09411731ae623625b10a933859a2a4a1f8495",
631 )
632 .expect("manifest info should never fail");
633 mani.apply(edit).expect("manifest apply should never fail");
634 drop(mani);
635 let _kvs = KeyValueStore::open(options).expect("key-value store should open");
636 sst_check! {
637 &root; "sst/fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a.sst":
638 "key1" => "value1",
639 "key2" => "value2",
640 };
641 assert!(!PathBuf::from(&root).join("log.0").exists());
642 }
643
644 #[test]
645 fn orphan_cleanup() {
646 let root = test_util::test_root(module_path!(), line!());
647 let options = LsmtkOptions {
648 path: root.clone(),
649 ..Default::default()
650 };
651 let sst_root = SST_ROOT(&root);
652 let _path = sst_for_test! {
653 sst_root:
654 "key1" => "value1",
655 "key2" => "value2",
656 };
657 let mut mani =
658 Manifest::open(options.mani.clone(), MANI_ROOT(&root)).expect("manifest should open");
659 let mut edit = Edit::default();
660 edit.info(
661 'I',
662 "0000000000000000000000000000000000000000000000000000000000000000",
663 )
664 .expect("manifest info should never fail");
665 edit.info(
666 'O',
667 "0000000000000000000000000000000000000000000000000000000000000000",
668 )
669 .expect("manifest info should never fail");
670 edit.info(
671 'D',
672 "0000000000000000000000000000000000000000000000000000000000000000",
673 )
674 .expect("manifest info should never fail");
675 mani.apply(edit).expect("manifest apply should never fail");
676 let mut edit = Edit::default();
677 edit.add("fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a")
678 .expect("manifest edit should never fail");
679 edit.info(
680 'I',
681 "0000000000000000000000000000000000000000000000000000000000000000",
682 )
683 .expect("manifest info should never fail");
684 edit.info(
685 'O',
686 "fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a",
687 )
688 .expect("manifest info should never fail");
689 edit.info(
690 'D',
691 "006c171eacb7d291d0a7ff7725d09411731ae623625b10a933859a2a4a1f8495",
692 )
693 .expect("manifest info should never fail");
694 mani.apply(edit).expect("manifest apply should never fail");
695 let mut edit = Edit::default();
696 edit.rm("fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a")
697 .expect("manifest edit should never fail");
698 edit.info(
699 'I',
700 "fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a",
701 )
702 .expect("manifest info should never fail");
703 edit.info(
704 'O',
705 "0000000000000000000000000000000000000000000000000000000000000000",
706 )
707 .expect("manifest info should never fail");
708 edit.info(
709 'D',
710 "fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a",
711 )
712 .expect("manifest info should never fail");
713 mani.apply(edit).expect("manifest apply should never fail");
714 drop(mani);
715 let _kvs = KeyValueStore::open(options).expect("key-value store should open");
716 assert!(
717 TRASH_SST(
718 &root,
719 Setsum::from_hexdigest(
720 "fb93e8e143482d6eef570088782f6bee22e519dc17a4ef56347a65d5fddf7b6a"
721 )
722 .expect("valid setsum")
723 )
724 .exists()
725 );
726 }
727 }