use std::fs::File;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{
AtomicBool, AtomicPtr, AtomicU64,
Ordering::{Acquire, SeqCst},
},
Arc,
};
use fault_injection::fallible;
#[derive(Clone, Copy)]
pub struct LocationHasher(u64);
impl Default for LocationHasher {
#[inline]
fn default() -> LocationHasher {
LocationHasher(0)
}
}
impl std::hash::Hasher for LocationHasher {
#[inline]
fn finish(&self) -> u64 {
self.0
}
#[inline]
fn write_u8(&mut self, n: u8) {
self.0 = u64::from(n);
}
#[inline]
fn write_u64(&mut self, n: u64) {
self.0 = n;
}
#[inline]
fn write(&mut self, _: &[u8]) {
panic!("trying to use LocationHasher with incorrect type");
}
}
type Map<K, V> = std::collections::HashMap<K, V, std::hash::BuildHasherDefault<LocationHasher>>;
mod config;
mod debug_delay;
#[cfg(feature = "runtime_validation")]
mod debug_history;
mod disk_location;
mod file_map;
mod gc;
mod location_table;
mod readpath;
mod recovery;
mod trailer;
mod writepath;
pub use config::Config;
use debug_delay::debug_delay;
use disk_location::{DiskLocation, RelativeDiskLocation};
use file_map::FileMap;
use location_table::LocationTable;
use trailer::{read_trailer, read_trailer_from_buf, write_trailer};
const HEADER_LEN: usize = 20;
const NEW_WRITE_BATCH_BIT: u64 = 1 << 62;
const NEW_WRITE_BATCH_MASK: u64 = u64::MAX - NEW_WRITE_BATCH_BIT;
type ObjectId = u64;
fn read_range_at(file: &File, start: u64, end: u64) -> io::Result<Vec<u8>> {
use std::os::unix::fs::FileExt;
let buf_sz: usize = (end - start).try_into().unwrap();
let mut buf = Vec::with_capacity(buf_sz);
unsafe {
buf.set_len(buf_sz);
}
fallible!(file.read_exact_at(&mut buf, start));
Ok(buf)
}
fn uninit_boxed_slice(len: usize) -> Box<[u8]> {
use std::alloc::{alloc, Layout};
let layout = Layout::array::<u8>(len).unwrap();
unsafe {
let ptr = alloc(layout);
let slice = std::slice::from_raw_parts_mut(ptr, len);
Box::from_raw(slice)
}
}
fn hash(len_buf: [u8; 8], pid_buf: [u8; 8], object_buf: &[u8]) -> [u8; 4] {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&len_buf);
hasher.update(&pid_buf);
hasher.update(&object_buf);
let crc: u32 = hasher.finalize();
crc.to_le_bytes()
}
#[derive(Debug, Copy, Clone)]
pub struct Stats {
pub live_objects: u64,
pub stored_objects: u64,
pub dead_objects: u64,
pub live_ratio: f32,
pub files: usize,
pub total_file_size: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub high_level_user_bytes_written: u64,
pub write_amplification: f32,
pub space_amplification: f32,
}
#[derive(Default, Debug, Clone, Copy)]
struct Metadata {
lsn: u64,
trailer_offset: u64,
present_objects: u64,
generation: u8,
file_size: u64,
}
impl Metadata {
fn parse(name: &str, file_size: u64) -> Option<Metadata> {
let mut splits = name.split("-");
Some(Metadata {
lsn: u64::from_str_radix(&splits.next()?, 16).ok()?,
trailer_offset: u64::from_str_radix(&splits.next()?, 16).ok()?,
present_objects: u64::from_str_radix(&splits.next()?, 16).ok()?,
generation: u8::from_str_radix(splits.next()?, 16).ok()?,
file_size,
})
}
fn to_file_name(&self) -> String {
let ret = format!(
"{:016x}-{:016x}-{:016x}-{:01x}",
self.lsn, self.trailer_offset, self.present_objects, self.generation
);
ret
}
}
#[derive(Debug)]
struct FileAndMetadata {
file: File,
location: DiskLocation,
path: AtomicPtr<PathBuf>,
metadata: AtomicPtr<Metadata>,
live_objects: AtomicU64,
generation: u8,
rewrite_claim: AtomicBool,
synced: AtomicBool,
}
impl Drop for FileAndMetadata {
fn drop(&mut self) {
let empty = self.live_objects.load(Acquire) == 0;
if empty {
if let Err(e) = std::fs::remove_file(self.path().unwrap()) {
eprintln!("failed to remove empty FileAndMetadata on drop: {:?}", e);
}
}
let path_ptr = self.path.load(Acquire);
if !path_ptr.is_null() {
drop(unsafe { Box::from_raw(path_ptr) });
}
let metadata_ptr = self.metadata.load(Acquire);
if !metadata_ptr.is_null() {
drop(unsafe { Box::from_raw(metadata_ptr) });
}
}
}
impl FileAndMetadata {
fn metadata(&self) -> Option<&Metadata> {
let metadata_ptr = self.metadata.load(Acquire);
if metadata_ptr.is_null() {
None
} else {
Some(unsafe { &*metadata_ptr })
}
}
fn install_metadata_and_path(&self, metadata: Metadata, path: PathBuf) {
let path_ptr = Box::into_raw(Box::new(path));
let old_path_ptr = self.path.swap(path_ptr, SeqCst);
assert!(old_path_ptr.is_null());
let meta_ptr = Box::into_raw(Box::new(metadata));
let old_meta_ptr = self.metadata.swap(meta_ptr, SeqCst);
assert!(old_meta_ptr.is_null());
}
fn path(&self) -> Option<&PathBuf> {
let path_ptr = self.path.load(Acquire);
if path_ptr.is_null() {
None
} else {
Some(unsafe { &*path_ptr })
}
}
}
pub fn default_partition_function(_object_id: u64, size: usize) -> u8 {
const SUBPAGE_MAX: usize = PAGE_MIN - 1;
const PAGE_MIN: usize = 2048;
const PAGE_MAX: usize = 16 * 1024;
const BLOCK_MIN: usize = PAGE_MAX + 1;
const BLOCK_MAX: usize = 4 * 1024 * 1024;
match size {
0..=SUBPAGE_MAX => 0,
PAGE_MIN..=PAGE_MAX => 1,
BLOCK_MIN..=BLOCK_MAX => 2,
_ => 3,
}
}
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Marble> {
let config = Config {
path: path.as_ref().into(),
..Config::default()
};
config.open()
}
#[derive(Clone)]
pub struct Marble {
location_table: LocationTable,
max_object_id: Arc<AtomicU64>,
file_map: FileMap,
config: Config,
directory_lock: Arc<File>,
#[cfg(feature = "runtime_validation")]
debug_history: Arc<std::sync::Mutex<debug_history::DebugHistory>>,
bytes_read: Arc<AtomicU64>,
bytes_written: Arc<AtomicU64>,
high_level_user_bytes_written: Arc<AtomicU64>,
}
impl std::fmt::Debug for Marble {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Marble")
.field("stats", &self.stats())
.finish()
}
}
impl std::fmt::Display for Marble {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Marble {{ ... }}")
}
}
impl Marble {
#[doc(alias = "file_statistics")]
#[doc(alias = "statistics")]
#[doc(alias = "metrics")]
#[doc(alias = "info")]
pub fn stats(&self) -> Stats {
let (fams_len, total_file_size, stored_objects, live_objects) = self.file_map.stats();
let bytes_read = self.bytes_read.load(Acquire);
let bytes_written = self.bytes_written.load(Acquire);
let high_level_user_bytes_written = self.high_level_user_bytes_written.load(Acquire);
let live_ratio = live_objects as f32 / stored_objects.max(1) as f32;
let approximate_live_data = live_ratio * total_file_size as f32;
let write_amplification = bytes_written as f32 / high_level_user_bytes_written as f32;
let space_amplification = total_file_size as f32 / approximate_live_data;
Stats {
live_objects,
stored_objects,
dead_objects: stored_objects - live_objects,
live_ratio,
files: fams_len,
total_file_size,
bytes_read,
bytes_written,
high_level_user_bytes_written,
write_amplification,
space_amplification,
}
}
fn prune_empty_files(&self) -> io::Result<()> {
self.file_map.prune_empty_files(&self.location_table)
}
pub fn sync_all(&self) -> io::Result<()> {
let synced_files = self.file_map.sync_all()?;
if synced_files {
fallible!(self.directory_lock.sync_all());
}
Ok(())
}
pub fn free_object_ids<'a>(&'a self) -> (u64, impl 'a + Iterator<Item = u64>) {
let max = self.max_object_id.load(Acquire);
let iter = (0..=max).filter_map(|oid| {
if self.location_table.load(oid).is_none() {
Some(oid)
} else {
None
}
});
(max + 1, iter)
}
pub fn allocated_object_ids<'a>(&'a self) -> impl 'a + Iterator<Item = u64> {
let max = self.max_object_id.load(Acquire);
(0..=max).filter_map(|oid| {
if self.location_table.load(oid).is_some() {
Some(oid)
} else {
None
}
})
}
}