use byteorder::{BigEndian, ByteOrder};
use prometrics::metrics::MetricBuilder;
use std::io::{Read, SeekFrom, Write};
use block::{AlignedBytes, BlockSize};
use metrics::DataRegionMetrics;
use nvm::NonVolatileMemory;
use storage::allocator::DataPortionAllocator;
use storage::portion::DataPortion;
use {ErrorKind, Result};
const LUMP_DATA_TRAILER_SIZE: usize = 2;
#[derive(Debug)]
pub struct DataRegion<N> {
allocator: DataPortionAllocator,
nvm: N,
block_size: BlockSize,
metrics: DataRegionMetrics,
}
impl<N> DataRegion<N>
where
N: NonVolatileMemory,
{
pub fn new(metric_builder: &MetricBuilder, allocator: DataPortionAllocator, nvm: N) -> Self {
let capacity = allocator.metrics().capacity_bytes;
let block_size = allocator.metrics().block_size;
let allocator_metrics = allocator.metrics().clone();
DataRegion {
allocator,
nvm,
block_size,
metrics: DataRegionMetrics::new(metric_builder, capacity, allocator_metrics),
}
}
pub fn metrics(&self) -> &DataRegionMetrics {
&self.metrics
}
pub fn put(&mut self, data: &DataRegionLumpData) -> Result<DataPortion> {
track_assert!(
data.block_size().contains(self.block_size),
ErrorKind::InvalidInput
);
let block_size = self.block_count(data.as_external_bytes().len() as u32) as u16;
let portion =
track_assert_some!(self.allocator.allocate(block_size), ErrorKind::StorageFull);
let (offset, _size) = self.real_portion(&portion);
track_io!(self.nvm.seek(SeekFrom::Start(offset)))?;
track!(data.write_to(&mut self.nvm))?;
track_io!(self.nvm.flush())?;
Ok(portion)
}
pub fn get(&mut self, portion: DataPortion) -> Result<DataRegionLumpData> {
let (offset, size) = self.real_portion(&portion);
track_io!(self.nvm.seek(SeekFrom::Start(offset)))?;
let buf = AlignedBytes::new(size, self.block_size);
let data = track!(DataRegionLumpData::read_from(&mut self.nvm, buf))?;
Ok(data)
}
pub fn delete(&mut self, portion: DataPortion) {
self.allocator.release(portion);
}
fn real_portion(&self, portion: &DataPortion) -> (u64, usize) {
let offset = portion.start.as_u64() * u64::from(self.block_size.as_u16());
let size = portion.len as usize * self.block_size.as_u16() as usize;
(offset, size)
}
fn block_count(&self, size: u32) -> u32 {
(size + u32::from(self.block_size.as_u16()) - 1) / u32::from(self.block_size.as_u16())
}
}
#[derive(Debug, Clone)]
pub struct DataRegionLumpData {
bytes: AlignedBytes,
data_size: usize,
}
impl DataRegionLumpData {
pub fn new(data_size: usize, block_size: BlockSize) -> Self {
let size = data_size + LUMP_DATA_TRAILER_SIZE;
let mut bytes = AlignedBytes::new(size, block_size);
bytes.align();
let trailer_offset = bytes.len() - LUMP_DATA_TRAILER_SIZE;
let padding_len = bytes.len() - size;
debug_assert!(padding_len <= 0xFFFF);
BigEndian::write_u16(&mut bytes[trailer_offset..], padding_len as u16);
DataRegionLumpData { bytes, data_size }
}
pub fn block_size(&self) -> BlockSize {
self.bytes.block_size()
}
pub fn as_bytes(&self) -> &[u8] {
&self.bytes[..self.data_size]
}
pub fn as_bytes_mut(&mut self) -> &mut [u8] {
&mut self.bytes[..self.data_size]
}
fn as_external_bytes(&self) -> &[u8] {
self.bytes.as_ref()
}
fn write_to<W: Write>(&self, mut writer: W) -> Result<()> {
track_io!(writer.write_all(self.as_external_bytes()))
}
fn read_from<R: Read>(mut reader: R, mut buf: AlignedBytes) -> Result<Self> {
track_assert!(buf.len() >= LUMP_DATA_TRAILER_SIZE, ErrorKind::InvalidInput);
track_io!(reader.read_exact(&mut buf))?;
let padding_len = BigEndian::read_u16(&buf[buf.len() - LUMP_DATA_TRAILER_SIZE..]) as usize;
let data_size = buf
.len()
.saturating_sub(LUMP_DATA_TRAILER_SIZE + padding_len);
Ok(DataRegionLumpData {
bytes: buf,
data_size,
})
}
}
#[cfg(test)]
mod tests {
use prometrics::metrics::MetricBuilder;
use std::iter;
use trackable::result::TestResult;
use super::super::allocator::DataPortionAllocator;
use super::*;
use block::BlockSize;
use metrics::DataAllocatorMetrics;
use nvm::MemoryNvm;
#[test]
fn data_region_works() -> TestResult {
let capacity = 10 * 1024;
let block_size = BlockSize::min();
let metrics = MetricBuilder::new();
let allocator = track!(DataPortionAllocator::build(
DataAllocatorMetrics::new(&metrics, capacity, block_size),
iter::empty(),
))?;
let nvm = MemoryNvm::new(vec![0; capacity as usize]);
let mut region = DataRegion::new(&metrics, allocator, nvm);
let mut data = DataRegionLumpData::new(3, block_size);
data.as_bytes_mut().copy_from_slice(b"foo");
let portion = track!(region.put(&data))?;
assert_eq!(
region.get(portion).ok().map(|d| d.as_bytes().to_owned()),
Some(b"foo".to_vec())
);
Ok(())
}
}