use prometrics::metrics::MetricBuilder;
use std::io::SeekFrom;
use uuid::Uuid;
use block::BlockSize;
use metrics::{DataAllocatorMetrics, StorageMetrics};
use nvm::NonVolatileMemory;
use storage::allocator::DataPortionAllocator;
use storage::data_region::DataRegion;
use storage::header::FULL_HEADER_SIZE;
use storage::index::LumpIndex;
use storage::journal::{JournalRegion, JournalRegionOptions};
use storage::{
Storage, StorageHeader, MAJOR_VERSION, MAX_DATA_REGION_SIZE, MAX_JOURNAL_REGION_SIZE,
MINOR_VERSION,
};
use {ErrorKind, Result};
#[derive(Debug, Clone)]
pub struct StorageBuilder {
journal_region_ratio: f64,
instance_uuid: Option<Uuid>,
journal: JournalRegionOptions,
metrics: MetricBuilder,
}
impl StorageBuilder {
pub fn new() -> Self {
StorageBuilder {
journal_region_ratio: 0.01,
instance_uuid: None,
journal: JournalRegionOptions::default(),
metrics: MetricBuilder::new(),
}
}
pub fn instance_uuid(&mut self, uuid: Uuid) -> &mut Self {
self.instance_uuid = Some(uuid);
self
}
pub fn journal_region_ratio(&mut self, ratio: f64) -> &mut Self {
self.journal_region_ratio = ratio;
self
}
pub fn journal_gc_queue_size(&mut self, size: usize) -> &mut Self {
self.journal.gc_queue_size = size;
self
}
pub fn journal_sync_interval(&mut self, interval: usize) -> &mut Self {
self.journal.sync_interval = interval;
self
}
pub fn block_size(&mut self, block_size: BlockSize) -> &mut Self {
self.journal.block_size = block_size;
self
}
pub fn metrics(&mut self, metrics: MetricBuilder) -> &mut Self {
self.metrics = metrics;
self
}
pub fn create<N>(&self, mut nvm: N) -> Result<Storage<N>>
where
N: NonVolatileMemory,
{
let storage_block_size = self.journal.block_size;
track_assert!(
storage_block_size.contains(nvm.block_size()),
ErrorKind::InvalidInput; storage_block_size, nvm.block_size()
);
let header = track!(self.make_header(nvm.capacity(), storage_block_size))?;
track_io!(nvm.seek(SeekFrom::Start(0)))?;
track!(nvm.aligned_write_all(|mut temp_buf| {
track!(header.write_header_region_to(&mut temp_buf))?;
track!(JournalRegion::<N>::initialize(temp_buf, storage_block_size))?;
Ok(())
}))?;
track!(nvm.sync())?;
track!(self.open(nvm))
}
pub fn open<N>(&self, mut nvm: N) -> Result<Storage<N>>
where
N: NonVolatileMemory,
{
track_io!(nvm.seek(SeekFrom::Start(0)))?;
let buf = track!(nvm.aligned_read_bytes(FULL_HEADER_SIZE as usize))?;
let mut header = track!(StorageHeader::read_from(&buf[..]))?;
if header.minor_version < MINOR_VERSION {
header.minor_version = MINOR_VERSION;
track_io!(nvm.seek(SeekFrom::Start(0)))?;
track!(nvm.aligned_write_all(|temp_buf| {
track!(header.write_header_region_to(temp_buf))?;
Ok(())
}))?;
}
track_assert!(
header.block_size.contains(nvm.block_size()),
ErrorKind::InvalidInput
);
let mut journal_options = self.journal.clone();
journal_options.block_size = header.block_size;
if let Some(expected_uuid) = self.instance_uuid {
track_assert_eq!(header.instance_uuid, expected_uuid, ErrorKind::InvalidInput);
}
let mut lump_index = LumpIndex::new();
let (journal_nvm, data_nvm) = track!(header.split_regions(nvm))?;
let journal_region = track!(JournalRegion::open(
journal_nvm,
&mut lump_index,
&self.metrics,
journal_options
))?;
let allocator = track!(DataPortionAllocator::build(
DataAllocatorMetrics::new(&self.metrics, header.data_region_size, header.block_size),
lump_index.data_portions(),
))?;
let data_region = DataRegion::new(&self.metrics, allocator, data_nvm);
let metrics = StorageMetrics::new(
&self.metrics,
&header,
journal_region.metrics().clone(),
data_region.metrics().clone(),
);
metrics.put_lumps_at_starting.add_u64(lump_index.len());
Ok(Storage::new(
header,
journal_region,
data_region,
lump_index,
metrics,
))
}
fn make_header(&self, capacity: u64, block_size: BlockSize) -> Result<StorageHeader> {
let journal_and_data_region_size = track_assert_some!(
capacity.checked_sub(StorageHeader::calc_region_size(block_size)),
ErrorKind::InvalidInput,
"Too small capacity: {}",
capacity
);
track_assert!(
0.0 <= self.journal_region_ratio && self.journal_region_ratio <= 1.0,
ErrorKind::InvalidInput,
"Invalid journal region ratio: {}",
self.journal_region_ratio
);
let journal_region_size =
(journal_and_data_region_size as f64 * self.journal_region_ratio) as u64;
let journal_region_size = block_size.ceil_align(journal_region_size);
track_assert!(
journal_region_size <= MAX_JOURNAL_REGION_SIZE,
ErrorKind::InvalidInput,
"Too large journal region: {} (capacity={}, ratio={})",
journal_region_size,
capacity,
self.journal_region_ratio
);
let data_region_size =
block_size.floor_align(journal_and_data_region_size - journal_region_size);
track_assert!(
data_region_size <= MAX_DATA_REGION_SIZE,
ErrorKind::InvalidInput,
"Too large data region: {} (capacity={}, journal_region_ratio={})",
data_region_size,
capacity,
self.journal_region_ratio
);
Ok(StorageHeader {
major_version: MAJOR_VERSION,
minor_version: MINOR_VERSION,
instance_uuid: self.instance_uuid.unwrap_or_else(Uuid::new_v4),
block_size,
journal_region_size,
data_region_size,
})
}
}
impl Default for StorageBuilder {
fn default() -> Self {
Self::new()
}
}