marble/lib.rs
1//! # Marble
2//!
3//! Marble is a low-level object store that can be used
4//! to build your own storage engines and databases on
5//! top of.
6//!
7//! At a high-level, it supports atomic batch writes and
8//! single-object reads. Garbage collection is manual.
9//! All operations are blocking. Nothing is cached
10//! in-memory. Objects may be sharded upon GC by providing
11//! a custom `Config::partition_function`. Partitioning
12//! is not performed on the write batch when it
13//! is initially written, because the write batch
14//! must be stored in a single file for it to be atomic.
15//! But when future calls to `Marble::maintenance`
16//! defragment the storage files by rewriting objects
17//! that are still live, it will use this function
18//! to assign the rewritten objects into a particular
19//! partition.
20//!
21//! You should think of Marble as the heap that you
22//! flush your write-ahead logs into periodically.
23//! It will create a new file for each write batch,
24//! and this might actually expand to more files after
25//! garbage collection if the batch is significantly
26//! larger than the `Config::target_file_size`.
27//!
28//! Marble does not create any threads or call
29//! `Marble::maintenance` automatically under any
30//! conditions. You should probably create a background
31//! thread that calls this periodically.
32//!
33//! # Examples
34//!
35//! ```
36//! let marble = marble::open("heap").unwrap();
37//!
38//! // Write new data keyed by a `u64` object ID.
39//! // Batches contain insertions and deletions
40//! // based on whether the value is a Some or None.
41//! marble.write_batch([
42//! (0_u64, Some(&[32_u8] as &[u8])),
43//! (4_u64, None),
44//! ]).unwrap();
45//!
46//! // read it back
47//! assert_eq!(marble.read(0).unwrap(), Some(vec![32].into_boxed_slice()));
48//! assert_eq!(marble.read(4).unwrap(), None);
49//! assert_eq!(marble.read(6).unwrap(), None);
50//!
51//! // after a few more batches that may have caused fragmentation
52//! // by overwriting previous objects, perform maintenance which
53//! // will defragment the object store based on `Config` settings.
54//! let objects_defragmented = marble.maintenance().unwrap();
55//!
56//! // print out system statistics
57//! dbg!(marble.stats());
58//! # drop(marble);
59//! # std::fs::remove_dir_all("heap").unwrap();
60//! ```
61//!
62//! which prints out something like
63//! ```txt,no_run
64//! marble.stats() = Stats {
65//! live_objects: 1048576,
66//! stored_objects: 1181100,
67//! dead_objects: 132524,
68//! live_percent: 88,
69//! files: 11,
70//! }
71//! ```
72//!
73//! If you want to customize the settings passed to Marble,
74//! you may specify your own `Config`:
75//!
76//! ```
77//! let config = marble::Config {
78//! path: "my_path".into(),
79//! fsync_each_batch: true,
80//! target_file_size: 64 * 1024 * 1024,
81//! file_compaction_percent: 50,
82//! ..Default::default()
83//! };
84//!
85//! let marble = config.open().unwrap();
86//! # drop(marble);
87//! # std::fs::remove_dir_all("my_path").unwrap();
88//! ```
89//!
90//! A custom GC sharding function may be provided
91//! for partitioning objects based on the object ID
92//! and size. This may be useful if your higher-level
93//! system allocates certain ranges of object IDs for
94//! certain types of objects that you would like to
95//! group together in the hope of grouping items together
96//! that have similar fragmentation properties (similar
97//! expected lifespan etc...). This will only shard
98//! objects when they are defragmented through the
99//! `Marble::maintenance` method, because each new
100//! write batch must be written together in one
101//! file to retain write batch atomicity in the
102//! face of crashes.
103//!
104//! ```
105//! // This function shards objects into partitions
106//! // similarly to a slab allocator that groups objects
107//! // into size buckets based on powers of two.
108//! fn shard_by_size(object_id: u64, object_size: usize) -> u8 {
109//! let next_po2 = object_size.next_power_of_two();
110//! u8::try_from(next_po2.trailing_zeros()).unwrap()
111//! }
112//!
113//! let config = marble::Config {
114//! path: "my_sharded_path".into(),
115//! partition_function: shard_by_size,
116//! ..Default::default()
117//! };
118//!
119//! let marble = config.open().unwrap();
120//! # drop(marble);
121//! # std::fs::remove_dir_all("my_sharded_path").unwrap();
122//! ```
123use std::fs::File;
124use std::io;
125use std::path::{Path, PathBuf};
126use std::sync::{
127 atomic::{
128 AtomicBool, AtomicPtr, AtomicU64,
129 Ordering::{Acquire, SeqCst},
130 },
131 Arc,
132};
133
134use fault_injection::fallible;
135
136#[derive(Clone, Copy)]
137pub struct LocationHasher(u64);
138
139impl Default for LocationHasher {
140 #[inline]
141 fn default() -> LocationHasher {
142 LocationHasher(0)
143 }
144}
145
146impl std::hash::Hasher for LocationHasher {
147 #[inline]
148 fn finish(&self) -> u64 {
149 self.0
150 }
151
152 #[inline]
153 fn write_u8(&mut self, n: u8) {
154 self.0 = u64::from(n);
155 }
156
157 #[inline]
158 fn write_u64(&mut self, n: u64) {
159 self.0 = n;
160 }
161
162 #[inline]
163 fn write(&mut self, _: &[u8]) {
164 panic!("trying to use LocationHasher with incorrect type");
165 }
166}
167
168type Map<K, V> = std::collections::HashMap<K, V, std::hash::BuildHasherDefault<LocationHasher>>;
169
170mod config;
171mod debug_delay;
172#[cfg(feature = "runtime_validation")]
173mod debug_history;
174mod disk_location;
175mod file_map;
176mod gc;
177mod location_table;
178mod readpath;
179mod recovery;
180mod trailer;
181mod writepath;
182
183pub use config::Config;
184use debug_delay::debug_delay;
185use disk_location::{DiskLocation, RelativeDiskLocation};
186use file_map::FileMap;
187use location_table::LocationTable;
188use trailer::{read_trailer, read_trailer_from_buf, write_trailer};
189
190const HEADER_LEN: usize = 20;
191const NEW_WRITE_BATCH_BIT: u64 = 1 << 62;
192const NEW_WRITE_BATCH_MASK: u64 = u64::MAX - NEW_WRITE_BATCH_BIT;
193
194type ObjectId = u64;
195
196fn read_range_at(file: &File, start: u64, end: u64) -> io::Result<Vec<u8>> {
197 use std::os::unix::fs::FileExt;
198
199 let buf_sz: usize = (end - start).try_into().unwrap();
200
201 let mut buf = Vec::with_capacity(buf_sz);
202
203 unsafe {
204 buf.set_len(buf_sz);
205 }
206
207 fallible!(file.read_exact_at(&mut buf, start));
208
209 Ok(buf)
210}
211
212fn uninit_boxed_slice(len: usize) -> Box<[u8]> {
213 use std::alloc::{alloc, Layout};
214
215 let layout = Layout::array::<u8>(len).unwrap();
216
217 unsafe {
218 let ptr = alloc(layout);
219 let slice = std::slice::from_raw_parts_mut(ptr, len);
220 Box::from_raw(slice)
221 }
222}
223
224fn hash(len_buf: [u8; 8], pid_buf: [u8; 8], object_buf: &[u8]) -> [u8; 4] {
225 let mut hasher = crc32fast::Hasher::new();
226 hasher.update(&len_buf);
227 hasher.update(&pid_buf);
228 hasher.update(&object_buf);
229 let crc: u32 = hasher.finalize();
230 crc.to_le_bytes()
231}
232
233/// Statistics for file contents, to base decisions around
234/// calls to `maintenance`.
235#[derive(Debug, Copy, Clone)]
236pub struct Stats {
237 /// The number of live objects stored in the backing
238 /// storage files.
239 pub live_objects: u64,
240 /// The total number of (potentially duplicated)
241 /// objects stored in the backing storage files.
242 pub stored_objects: u64,
243 /// The number of dead objects that have been replaced
244 /// or removed in other storage files, contributing
245 /// to fragmentation.
246 pub dead_objects: u64,
247 /// The ratio of all objects on disk that are
248 /// live to all objects in total. This is another way of expressing fragmentation.
249 pub live_ratio: f32,
250 /// The number of backing storage files that exist and are
251 /// being held open.
252 pub files: usize,
253 /// The sum of the sizes of all files currently on-disk.
254 pub total_file_size: u64,
255 /// The number of bytes that have been read since
256 /// this instance of `Marble` was recovered.
257 pub bytes_read: u64,
258 /// The number of bytes that have been written due
259 /// to calls to both `write_batch` and rewrites caused by
260 /// calls to `maintenance` since this instance of `Marble` was recovered.
261 pub bytes_written: u64,
262 /// This is the number of bytes that are written from user
263 /// calls to [`crate::Marble::write_batch`] since this instance
264 /// was recovered.
265 pub high_level_user_bytes_written: u64,
266 /// The ratio of all bytes written to high-level user data
267 /// since this instance of `Marble` was recovered. This is basically the
268 /// maintenance overhead of on-disk GC in response to objects being rewritten
269 /// and defragmentation maintenance copying old data to new homes. 1.0 is "perfect".
270 /// If all data needs to be copied once, this will be 2.0, etc... For reference,
271 /// many LSM tries will see write amplifications of a few dozen, and b-trees can often
272 /// see write amplifications of several hundred. So, if you're under 10 for serious workloads,
273 /// you're doing much better than most industrial systems.
274 pub write_amplification: f32,
275 /// The ratio of the sum of the size of all bytes written to the sum of the size of all high-level user data written
276 /// since this instance of `Marble` was recovered. This goes up with fragmentation, and is
277 /// brought back down with calls to `maintenance` that defragment storage files.
278 pub space_amplification: f32,
279}
280
281#[derive(Default, Debug, Clone, Copy)]
282struct Metadata {
283 lsn: u64,
284 trailer_offset: u64,
285 present_objects: u64,
286 generation: u8,
287 file_size: u64,
288}
289
290impl Metadata {
291 fn parse(name: &str, file_size: u64) -> Option<Metadata> {
292 let mut splits = name.split("-");
293
294 Some(Metadata {
295 lsn: u64::from_str_radix(&splits.next()?, 16).ok()?,
296 trailer_offset: u64::from_str_radix(&splits.next()?, 16).ok()?,
297 present_objects: u64::from_str_radix(&splits.next()?, 16).ok()?,
298 generation: u8::from_str_radix(splits.next()?, 16).ok()?,
299 file_size,
300 })
301 }
302
303 fn to_file_name(&self) -> String {
304 let ret = format!(
305 "{:016x}-{:016x}-{:016x}-{:01x}",
306 self.lsn, self.trailer_offset, self.present_objects, self.generation
307 );
308 ret
309 }
310}
311
312#[derive(Debug)]
313struct FileAndMetadata {
314 file: File,
315 location: DiskLocation,
316 path: AtomicPtr<PathBuf>,
317 metadata: AtomicPtr<Metadata>,
318 live_objects: AtomicU64,
319 generation: u8,
320 rewrite_claim: AtomicBool,
321 synced: AtomicBool,
322}
323
324impl Drop for FileAndMetadata {
325 fn drop(&mut self) {
326 let empty = self.live_objects.load(Acquire) == 0;
327 if empty {
328 if let Err(e) = std::fs::remove_file(self.path().unwrap()) {
329 eprintln!("failed to remove empty FileAndMetadata on drop: {:?}", e);
330 }
331 }
332
333 let path_ptr = self.path.load(Acquire);
334 if !path_ptr.is_null() {
335 drop(unsafe { Box::from_raw(path_ptr) });
336 }
337 let metadata_ptr = self.metadata.load(Acquire);
338 if !metadata_ptr.is_null() {
339 drop(unsafe { Box::from_raw(metadata_ptr) });
340 }
341 }
342}
343
344impl FileAndMetadata {
345 fn metadata(&self) -> Option<&Metadata> {
346 let metadata_ptr = self.metadata.load(Acquire);
347 if metadata_ptr.is_null() {
348 // metadata not yet initialized
349 None
350 } else {
351 Some(unsafe { &*metadata_ptr })
352 }
353 }
354
355 fn install_metadata_and_path(&self, metadata: Metadata, path: PathBuf) {
356 // NB: install path first because later on we
357 // want to be able to assume that if metadata
358 // is present, then so is path.
359 let path_ptr = Box::into_raw(Box::new(path));
360 let old_path_ptr = self.path.swap(path_ptr, SeqCst);
361 assert!(old_path_ptr.is_null());
362
363 let meta_ptr = Box::into_raw(Box::new(metadata));
364 let old_meta_ptr = self.metadata.swap(meta_ptr, SeqCst);
365 assert!(old_meta_ptr.is_null());
366 }
367
368 fn path(&self) -> Option<&PathBuf> {
369 let path_ptr = self.path.load(Acquire);
370 if path_ptr.is_null() {
371 // metadata not yet initialized
372 None
373 } else {
374 Some(unsafe { &*path_ptr })
375 }
376 }
377}
378
379/// Shard based on rough size ranges corresponding to SSD
380/// page and block sizes
381pub fn default_partition_function(_object_id: u64, size: usize) -> u8 {
382 const SUBPAGE_MAX: usize = PAGE_MIN - 1;
383 const PAGE_MIN: usize = 2048;
384 const PAGE_MAX: usize = 16 * 1024;
385 const BLOCK_MIN: usize = PAGE_MAX + 1;
386 const BLOCK_MAX: usize = 4 * 1024 * 1024;
387
388 match size {
389 // items smaller than known SSD page sizes go to shard 0
390 0..=SUBPAGE_MAX => 0,
391 // items that fall roughly within the range of SSD page sizes go to shard 1
392 PAGE_MIN..=PAGE_MAX => 1,
393 // items that fall roughly within the size of an SSD block go to shard 2
394 BLOCK_MIN..=BLOCK_MAX => 2,
395 // large items that are larger than typical SSD block sizes go to shard 3
396 _ => 3,
397 }
398}
399
400/// Open the system with default configuration at the
401/// provided path.
402pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Marble> {
403 let config = Config {
404 path: path.as_ref().into(),
405 ..Config::default()
406 };
407
408 config.open()
409}
410
411/// Garbage-collecting object store. A nice solution to back
412/// a pagecache, for people building their own databases.
413///
414/// Writes should generally be performed by some background
415/// process whose job it is to clean logs etc...
416#[derive(Clone)]
417pub struct Marble {
418 // maps from ObjectId to DiskLocation
419 location_table: LocationTable,
420 max_object_id: Arc<AtomicU64>,
421 file_map: FileMap,
422 config: Config,
423 directory_lock: Arc<File>,
424 #[cfg(feature = "runtime_validation")]
425 debug_history: Arc<std::sync::Mutex<debug_history::DebugHistory>>,
426 bytes_read: Arc<AtomicU64>,
427 bytes_written: Arc<AtomicU64>,
428 high_level_user_bytes_written: Arc<AtomicU64>,
429}
430
431impl std::fmt::Debug for Marble {
432 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433 f.debug_struct("Marble")
434 .field("stats", &self.stats())
435 .finish()
436 }
437}
438
439impl std::fmt::Display for Marble {
440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441 write!(f, "Marble {{ ... }}")
442 }
443}
444
445impl Marble {
446 /// Statistics about current files, intended to inform
447 /// decisions about when to call `maintenance` based on
448 /// desired write and space amplification
449 /// characteristics.
450 #[doc(alias = "file_statistics")]
451 #[doc(alias = "statistics")]
452 #[doc(alias = "metrics")]
453 #[doc(alias = "info")]
454 pub fn stats(&self) -> Stats {
455 let (fams_len, total_file_size, stored_objects, live_objects) = self.file_map.stats();
456
457 let bytes_read = self.bytes_read.load(Acquire);
458
459 let bytes_written = self.bytes_written.load(Acquire);
460
461 let high_level_user_bytes_written = self.high_level_user_bytes_written.load(Acquire);
462
463 let live_ratio = live_objects as f32 / stored_objects.max(1) as f32;
464 let approximate_live_data = live_ratio * total_file_size as f32;
465
466 let write_amplification = bytes_written as f32 / high_level_user_bytes_written as f32;
467 let space_amplification = total_file_size as f32 / approximate_live_data;
468
469 Stats {
470 live_objects,
471 stored_objects,
472 dead_objects: stored_objects - live_objects,
473 live_ratio,
474 files: fams_len,
475 total_file_size,
476 bytes_read,
477 bytes_written,
478 high_level_user_bytes_written,
479 write_amplification,
480 space_amplification,
481 }
482 }
483
484 fn prune_empty_files(&self) -> io::Result<()> {
485 self.file_map.prune_empty_files(&self.location_table)
486 }
487
488 /// If `Config::fsync_each_batch` is `false`, this
489 /// method can be called at a desired interval to
490 /// ensure that the written batches are durable on
491 /// disk.
492 pub fn sync_all(&self) -> io::Result<()> {
493 let synced_files = self.file_map.sync_all()?;
494 if synced_files {
495 fallible!(self.directory_lock.sync_all());
496 }
497 Ok(())
498 }
499
500 /// Intended for use in recovery, to bootstrap a higher level object ID allocator.
501 ///
502 /// Returns a tuple of 1 higher than the current max allocated object ID,
503 /// and an iterator over all object IDs beneath that which are
504 /// currently deleted (due to being stored as a `None` in a write batch).
505 pub fn free_object_ids<'a>(&'a self) -> (u64, impl 'a + Iterator<Item = u64>) {
506 let max = self.max_object_id.load(Acquire);
507
508 let iter = (0..=max).filter_map(|oid| {
509 if self.location_table.load(oid).is_none() {
510 Some(oid)
511 } else {
512 None
513 }
514 });
515 (max + 1, iter)
516 }
517
518 /// Returns an Iterator over all currently allocated object IDs.
519 pub fn allocated_object_ids<'a>(&'a self) -> impl 'a + Iterator<Item = u64> {
520 let max = self.max_object_id.load(Acquire);
521 (0..=max).filter_map(|oid| {
522 if self.location_table.load(oid).is_some() {
523 Some(oid)
524 } else {
525 None
526 }
527 })
528 }
529}