marble/
lib.rs

1//! # Marble
2//!
3//! Marble is a low-level object store that can be used
4//! to build your own storage engines and databases on
5//! top of.
6//!
7//! At a high-level, it supports atomic batch writes and
8//! single-object reads. Garbage collection is manual.
9//! All operations are blocking. Nothing is cached
10//! in-memory. Objects may be sharded upon GC by providing
11//! a custom `Config::partition_function`. Partitioning
12//! is not performed on the write batch when it
13//! is initially written, because the write batch
14//! must be stored in a single file for it to be atomic.
15//! But when future calls to `Marble::maintenance`
16//! defragment the storage files by rewriting objects
17//! that are still live, it will use this function
18//! to assign the rewritten objects into a particular
19//! partition.
20//!
21//! You should think of Marble as the heap that you
22//! flush your write-ahead logs into periodically.
23//! It will create a new file for each write batch,
24//! and this might actually expand to more files after
25//! garbage collection if the batch is significantly
26//! larger than the `Config::target_file_size`.
27//!
28//! Marble does not create any threads or call
29//! `Marble::maintenance` automatically under any
30//! conditions. You should probably create a background
31//! thread that calls this periodically.
32//!
33//! # Examples
34//!
35//! ```
36//! let marble = marble::open("heap").unwrap();
37//!
38//! // Write new data keyed by a `u64` object ID.
39//! // Batches contain insertions and deletions
40//! // based on whether the value is a Some or None.
41//! marble.write_batch([
42//!     (0_u64, Some(&[32_u8] as &[u8])),
43//!     (4_u64, None),
44//! ]).unwrap();
45//!
46//! // read it back
47//! assert_eq!(marble.read(0).unwrap(), Some(vec![32].into_boxed_slice()));
48//! assert_eq!(marble.read(4).unwrap(), None);
49//! assert_eq!(marble.read(6).unwrap(), None);
50//!
51//! // after a few more batches that may have caused fragmentation
52//! // by overwriting previous objects, perform maintenance which
53//! // will defragment the object store based on `Config` settings.
54//! let objects_defragmented = marble.maintenance().unwrap();
55//!
56//! // print out system statistics
57//! dbg!(marble.stats());
58//! # drop(marble);
59//! # std::fs::remove_dir_all("heap").unwrap();
60//! ```
61//!
62//! which prints out something like
63//! ```txt,no_run
64//! marble.stats() = Stats {
65//!     live_objects: 1048576,
66//!     stored_objects: 1181100,
67//!     dead_objects: 132524,
68//!     live_percent: 88,
69//!     files: 11,
70//! }
71//! ```
72//!
73//! If you want to customize the settings passed to Marble,
74//! you may specify your own `Config`:
75//!
76//! ```
77//! let config = marble::Config {
78//!     path: "my_path".into(),
79//!     fsync_each_batch: true,
80//!     target_file_size: 64 * 1024 * 1024,
81//!     file_compaction_percent: 50,
82//!     ..Default::default()
83//! };
84//!
85//! let marble = config.open().unwrap();
86//! # drop(marble);
87//! # std::fs::remove_dir_all("my_path").unwrap();
88//! ```
89//!
90//! A custom GC sharding function may be provided
91//! for partitioning objects based on the object ID
92//! and size. This may be useful if your higher-level
93//! system allocates certain ranges of object IDs for
94//! certain types of objects that you would like to
95//! group together in the hope of grouping items together
96//! that have similar fragmentation properties (similar
97//! expected lifespan etc...). This will only shard
98//! objects when they are defragmented through the
99//! `Marble::maintenance` method, because each new
100//! write batch must be written together in one
101//! file to retain write batch atomicity in the
102//! face of crashes.
103//!
104//! ```
105//! // This function shards objects into partitions
106//! // similarly to a slab allocator that groups objects
107//! // into size buckets based on powers of two.
108//! fn shard_by_size(object_id: u64, object_size: usize) -> u8 {
109//!     let next_po2 = object_size.next_power_of_two();
110//!     u8::try_from(next_po2.trailing_zeros()).unwrap()
111//! }
112//!
113//! let config = marble::Config {
114//!     path: "my_sharded_path".into(),
115//!     partition_function: shard_by_size,
116//!     ..Default::default()
117//! };
118//!
119//! let marble = config.open().unwrap();
120//! # drop(marble);
121//! # std::fs::remove_dir_all("my_sharded_path").unwrap();
122//! ```
123use std::fs::File;
124use std::io;
125use std::path::{Path, PathBuf};
126use std::sync::{
127    atomic::{
128        AtomicBool, AtomicPtr, AtomicU64,
129        Ordering::{Acquire, SeqCst},
130    },
131    Arc,
132};
133
134use fault_injection::fallible;
135
136#[derive(Clone, Copy)]
137pub struct LocationHasher(u64);
138
139impl Default for LocationHasher {
140    #[inline]
141    fn default() -> LocationHasher {
142        LocationHasher(0)
143    }
144}
145
146impl std::hash::Hasher for LocationHasher {
147    #[inline]
148    fn finish(&self) -> u64 {
149        self.0
150    }
151
152    #[inline]
153    fn write_u8(&mut self, n: u8) {
154        self.0 = u64::from(n);
155    }
156
157    #[inline]
158    fn write_u64(&mut self, n: u64) {
159        self.0 = n;
160    }
161
162    #[inline]
163    fn write(&mut self, _: &[u8]) {
164        panic!("trying to use LocationHasher with incorrect type");
165    }
166}
167
168type Map<K, V> = std::collections::HashMap<K, V, std::hash::BuildHasherDefault<LocationHasher>>;
169
170mod config;
171mod debug_delay;
172#[cfg(feature = "runtime_validation")]
173mod debug_history;
174mod disk_location;
175mod file_map;
176mod gc;
177mod location_table;
178mod readpath;
179mod recovery;
180mod trailer;
181mod writepath;
182
183pub use config::Config;
184use debug_delay::debug_delay;
185use disk_location::{DiskLocation, RelativeDiskLocation};
186use file_map::FileMap;
187use location_table::LocationTable;
188use trailer::{read_trailer, read_trailer_from_buf, write_trailer};
189
190const HEADER_LEN: usize = 20;
191const NEW_WRITE_BATCH_BIT: u64 = 1 << 62;
192const NEW_WRITE_BATCH_MASK: u64 = u64::MAX - NEW_WRITE_BATCH_BIT;
193
194type ObjectId = u64;
195
196fn read_range_at(file: &File, start: u64, end: u64) -> io::Result<Vec<u8>> {
197    use std::os::unix::fs::FileExt;
198
199    let buf_sz: usize = (end - start).try_into().unwrap();
200
201    let mut buf = Vec::with_capacity(buf_sz);
202
203    unsafe {
204        buf.set_len(buf_sz);
205    }
206
207    fallible!(file.read_exact_at(&mut buf, start));
208
209    Ok(buf)
210}
211
212fn uninit_boxed_slice(len: usize) -> Box<[u8]> {
213    use std::alloc::{alloc, Layout};
214
215    let layout = Layout::array::<u8>(len).unwrap();
216
217    unsafe {
218        let ptr = alloc(layout);
219        let slice = std::slice::from_raw_parts_mut(ptr, len);
220        Box::from_raw(slice)
221    }
222}
223
224fn hash(len_buf: [u8; 8], pid_buf: [u8; 8], object_buf: &[u8]) -> [u8; 4] {
225    let mut hasher = crc32fast::Hasher::new();
226    hasher.update(&len_buf);
227    hasher.update(&pid_buf);
228    hasher.update(&object_buf);
229    let crc: u32 = hasher.finalize();
230    crc.to_le_bytes()
231}
232
233/// Statistics for file contents, to base decisions around
234/// calls to `maintenance`.
235#[derive(Debug, Copy, Clone)]
236pub struct Stats {
237    /// The number of live objects stored in the backing
238    /// storage files.
239    pub live_objects: u64,
240    /// The total number of (potentially duplicated)
241    /// objects stored in the backing storage files.
242    pub stored_objects: u64,
243    /// The number of dead objects that have been replaced
244    /// or removed in other storage files, contributing
245    /// to fragmentation.
246    pub dead_objects: u64,
247    /// The ratio of all objects on disk that are
248    /// live to all objects in total. This is another way of expressing fragmentation.
249    pub live_ratio: f32,
250    /// The number of backing storage files that exist and are
251    /// being held open.
252    pub files: usize,
253    /// The sum of the sizes of all files currently on-disk.
254    pub total_file_size: u64,
255    /// The number of bytes that have been read since
256    /// this instance of `Marble` was recovered.
257    pub bytes_read: u64,
258    /// The number of bytes that have been written due
259    /// to calls to both `write_batch` and rewrites caused by
260    /// calls to `maintenance` since this instance of `Marble` was recovered.
261    pub bytes_written: u64,
262    /// This is the number of bytes that are written from user
263    /// calls to [`crate::Marble::write_batch`] since this instance
264    /// was recovered.
265    pub high_level_user_bytes_written: u64,
266    /// The ratio of all bytes written to high-level user data
267    /// since this instance of `Marble` was recovered. This is basically the
268    /// maintenance overhead of on-disk GC in response to objects being rewritten
269    /// and defragmentation maintenance copying old data to new homes. 1.0 is "perfect".
270    /// If all data needs to be copied once, this will be 2.0, etc... For reference,
271    /// many LSM tries will see write amplifications of a few dozen, and b-trees can often
272    /// see write amplifications of several hundred. So, if you're under 10 for serious workloads,
273    /// you're doing much better than most industrial systems.
274    pub write_amplification: f32,
275    /// The ratio of the sum of the size of all bytes written to the sum of the size of all high-level user data written
276    /// since this instance of `Marble` was recovered. This goes up with fragmentation, and is
277    /// brought back down with calls to `maintenance` that defragment storage files.
278    pub space_amplification: f32,
279}
280
281#[derive(Default, Debug, Clone, Copy)]
282struct Metadata {
283    lsn: u64,
284    trailer_offset: u64,
285    present_objects: u64,
286    generation: u8,
287    file_size: u64,
288}
289
290impl Metadata {
291    fn parse(name: &str, file_size: u64) -> Option<Metadata> {
292        let mut splits = name.split("-");
293
294        Some(Metadata {
295            lsn: u64::from_str_radix(&splits.next()?, 16).ok()?,
296            trailer_offset: u64::from_str_radix(&splits.next()?, 16).ok()?,
297            present_objects: u64::from_str_radix(&splits.next()?, 16).ok()?,
298            generation: u8::from_str_radix(splits.next()?, 16).ok()?,
299            file_size,
300        })
301    }
302
303    fn to_file_name(&self) -> String {
304        let ret = format!(
305            "{:016x}-{:016x}-{:016x}-{:01x}",
306            self.lsn, self.trailer_offset, self.present_objects, self.generation
307        );
308        ret
309    }
310}
311
312#[derive(Debug)]
313struct FileAndMetadata {
314    file: File,
315    location: DiskLocation,
316    path: AtomicPtr<PathBuf>,
317    metadata: AtomicPtr<Metadata>,
318    live_objects: AtomicU64,
319    generation: u8,
320    rewrite_claim: AtomicBool,
321    synced: AtomicBool,
322}
323
324impl Drop for FileAndMetadata {
325    fn drop(&mut self) {
326        let empty = self.live_objects.load(Acquire) == 0;
327        if empty {
328            if let Err(e) = std::fs::remove_file(self.path().unwrap()) {
329                eprintln!("failed to remove empty FileAndMetadata on drop: {:?}", e);
330            }
331        }
332
333        let path_ptr = self.path.load(Acquire);
334        if !path_ptr.is_null() {
335            drop(unsafe { Box::from_raw(path_ptr) });
336        }
337        let metadata_ptr = self.metadata.load(Acquire);
338        if !metadata_ptr.is_null() {
339            drop(unsafe { Box::from_raw(metadata_ptr) });
340        }
341    }
342}
343
344impl FileAndMetadata {
345    fn metadata(&self) -> Option<&Metadata> {
346        let metadata_ptr = self.metadata.load(Acquire);
347        if metadata_ptr.is_null() {
348            // metadata not yet initialized
349            None
350        } else {
351            Some(unsafe { &*metadata_ptr })
352        }
353    }
354
355    fn install_metadata_and_path(&self, metadata: Metadata, path: PathBuf) {
356        // NB: install path first because later on we
357        // want to be able to assume that if metadata
358        // is present, then so is path.
359        let path_ptr = Box::into_raw(Box::new(path));
360        let old_path_ptr = self.path.swap(path_ptr, SeqCst);
361        assert!(old_path_ptr.is_null());
362
363        let meta_ptr = Box::into_raw(Box::new(metadata));
364        let old_meta_ptr = self.metadata.swap(meta_ptr, SeqCst);
365        assert!(old_meta_ptr.is_null());
366    }
367
368    fn path(&self) -> Option<&PathBuf> {
369        let path_ptr = self.path.load(Acquire);
370        if path_ptr.is_null() {
371            // metadata not yet initialized
372            None
373        } else {
374            Some(unsafe { &*path_ptr })
375        }
376    }
377}
378
379/// Shard based on rough size ranges corresponding to SSD
380/// page and block sizes
381pub fn default_partition_function(_object_id: u64, size: usize) -> u8 {
382    const SUBPAGE_MAX: usize = PAGE_MIN - 1;
383    const PAGE_MIN: usize = 2048;
384    const PAGE_MAX: usize = 16 * 1024;
385    const BLOCK_MIN: usize = PAGE_MAX + 1;
386    const BLOCK_MAX: usize = 4 * 1024 * 1024;
387
388    match size {
389        // items smaller than known SSD page sizes go to shard 0
390        0..=SUBPAGE_MAX => 0,
391        // items that fall roughly within the range of SSD page sizes go to shard 1
392        PAGE_MIN..=PAGE_MAX => 1,
393        // items that fall roughly within the size of an SSD block go to shard 2
394        BLOCK_MIN..=BLOCK_MAX => 2,
395        // large items that are larger than typical SSD block sizes go to shard 3
396        _ => 3,
397    }
398}
399
400/// Open the system with default configuration at the
401/// provided path.
402pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Marble> {
403    let config = Config {
404        path: path.as_ref().into(),
405        ..Config::default()
406    };
407
408    config.open()
409}
410
411/// Garbage-collecting object store. A nice solution to back
412/// a pagecache, for people building their own databases.
413///
414/// Writes should generally be performed by some background
415/// process whose job it is to clean logs etc...
416#[derive(Clone)]
417pub struct Marble {
418    // maps from ObjectId to DiskLocation
419    location_table: LocationTable,
420    max_object_id: Arc<AtomicU64>,
421    file_map: FileMap,
422    config: Config,
423    directory_lock: Arc<File>,
424    #[cfg(feature = "runtime_validation")]
425    debug_history: Arc<std::sync::Mutex<debug_history::DebugHistory>>,
426    bytes_read: Arc<AtomicU64>,
427    bytes_written: Arc<AtomicU64>,
428    high_level_user_bytes_written: Arc<AtomicU64>,
429}
430
431impl std::fmt::Debug for Marble {
432    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433        f.debug_struct("Marble")
434            .field("stats", &self.stats())
435            .finish()
436    }
437}
438
439impl std::fmt::Display for Marble {
440    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441        write!(f, "Marble {{ ... }}")
442    }
443}
444
445impl Marble {
446    /// Statistics about current files, intended to inform
447    /// decisions about when to call `maintenance` based on
448    /// desired write and space amplification
449    /// characteristics.
450    #[doc(alias = "file_statistics")]
451    #[doc(alias = "statistics")]
452    #[doc(alias = "metrics")]
453    #[doc(alias = "info")]
454    pub fn stats(&self) -> Stats {
455        let (fams_len, total_file_size, stored_objects, live_objects) = self.file_map.stats();
456
457        let bytes_read = self.bytes_read.load(Acquire);
458
459        let bytes_written = self.bytes_written.load(Acquire);
460
461        let high_level_user_bytes_written = self.high_level_user_bytes_written.load(Acquire);
462
463        let live_ratio = live_objects as f32 / stored_objects.max(1) as f32;
464        let approximate_live_data = live_ratio * total_file_size as f32;
465
466        let write_amplification = bytes_written as f32 / high_level_user_bytes_written as f32;
467        let space_amplification = total_file_size as f32 / approximate_live_data;
468
469        Stats {
470            live_objects,
471            stored_objects,
472            dead_objects: stored_objects - live_objects,
473            live_ratio,
474            files: fams_len,
475            total_file_size,
476            bytes_read,
477            bytes_written,
478            high_level_user_bytes_written,
479            write_amplification,
480            space_amplification,
481        }
482    }
483
484    fn prune_empty_files(&self) -> io::Result<()> {
485        self.file_map.prune_empty_files(&self.location_table)
486    }
487
488    /// If `Config::fsync_each_batch` is `false`, this
489    /// method can be called at a desired interval to
490    /// ensure that the written batches are durable on
491    /// disk.
492    pub fn sync_all(&self) -> io::Result<()> {
493        let synced_files = self.file_map.sync_all()?;
494        if synced_files {
495            fallible!(self.directory_lock.sync_all());
496        }
497        Ok(())
498    }
499
500    /// Intended for use in recovery, to bootstrap a higher level object ID allocator.
501    ///
502    /// Returns a tuple of 1 higher than the current max allocated object ID,
503    /// and an iterator over all object IDs beneath that which are
504    /// currently deleted (due to being stored as a `None` in a write batch).
505    pub fn free_object_ids<'a>(&'a self) -> (u64, impl 'a + Iterator<Item = u64>) {
506        let max = self.max_object_id.load(Acquire);
507
508        let iter = (0..=max).filter_map(|oid| {
509            if self.location_table.load(oid).is_none() {
510                Some(oid)
511            } else {
512                None
513            }
514        });
515        (max + 1, iter)
516    }
517
518    /// Returns an Iterator over all currently allocated object IDs.
519    pub fn allocated_object_ids<'a>(&'a self) -> impl 'a + Iterator<Item = u64> {
520        let max = self.max_object_id.load(Acquire);
521        (0..=max).filter_map(|oid| {
522            if self.location_table.load(oid).is_some() {
523                Some(oid)
524            } else {
525                None
526            }
527        })
528    }
529}