pub use self::address::Address;
pub use self::builder::StorageBuilder;
pub use self::header::StorageHeader;
pub use self::journal::{JournalEntry, JournalRecord, JournalSnapshot};
pub(crate) use self::data_region::DataRegionLumpData;
use self::data_region::DataRegion;
use self::index::LumpIndex;
use self::journal::JournalRegion;
use self::portion::Portion;
use block::BlockSize;
use lump::{LumpData, LumpDataInner, LumpHeader, LumpId};
use metrics::StorageMetrics;
use nvm::NonVolatileMemory;
use std::ops::Range;
use Result;
mod address;
mod allocator;
mod builder;
mod data_region;
mod header;
mod index;
mod journal;
mod portion;
pub const MAGIC_NUMBER: [u8; 4] = *b"lusf";
pub const MAJOR_VERSION: u16 = 1;
pub const MINOR_VERSION: u16 = 1;
pub const MAX_JOURNAL_REGION_SIZE: u64 = Address::MAX;
pub const MAX_DATA_REGION_SIZE: u64 = Address::MAX * BlockSize::MIN as u64;
#[derive(Debug)]
pub struct Storage<N>
where
N: NonVolatileMemory,
{
header: StorageHeader,
journal_region: JournalRegion<N>,
data_region: DataRegion<N>,
lump_index: LumpIndex,
metrics: StorageMetrics,
}
impl<N> Storage<N>
where
N: NonVolatileMemory,
{
pub(crate) fn new(
header: StorageHeader,
journal_region: JournalRegion<N>,
data_region: DataRegion<N>,
lump_index: LumpIndex,
metrics: StorageMetrics,
) -> Self {
Storage {
header,
journal_region,
data_region,
lump_index,
metrics,
}
}
pub fn create(nvm: N) -> Result<Self> {
track!(StorageBuilder::new().create(nvm))
}
pub fn open(nvm: N) -> Result<Self> {
track!(StorageBuilder::new().open(nvm))
}
pub fn header(&self) -> &StorageHeader {
&self.header
}
pub fn metrics(&self) -> &StorageMetrics {
&self.metrics
}
pub fn usage_range(&self, range: Range<LumpId>) -> StorageUsage {
self.lump_index.usage_range(range, self.header.block_size)
}
pub fn get(&mut self, lump_id: &LumpId) -> Result<Option<LumpData>> {
match self.lump_index.get(lump_id) {
None => Ok(None),
Some(portion) => {
let data = match portion {
Portion::Journal(portion) => {
self.metrics.get_journal_lumps.increment();
let bytes = track!(self.journal_region.get_embedded_data(portion))?;
track!(LumpData::new_embedded(bytes))?
}
Portion::Data(portion) => {
self.metrics.get_data_lumps.increment();
track!(self.data_region.get(portion).map(LumpData::from))?
}
};
Ok(Some(data))
}
}
}
pub fn head(&self, lump_id: &LumpId) -> Option<LumpHeader> {
self.lump_index.get(lump_id).map(|portion| LumpHeader {
approximate_data_size: portion.len(self.header.block_size),
})
}
pub fn list(&self) -> Vec<LumpId> {
self.lump_index.list()
}
pub fn list_range(&mut self, range: Range<LumpId>) -> Vec<LumpId> {
self.lump_index.list_range(range)
}
pub fn put(&mut self, lump_id: &LumpId, data: &LumpData) -> Result<bool> {
let updated = track!(self.delete_if_exists(lump_id, false))?;
match data.as_inner() {
LumpDataInner::JournalRegion(data) => {
track!(self
.journal_region
.records_embed(&mut self.lump_index, lump_id, data))?;
}
LumpDataInner::DataRegion(data) => {
track!(self.put_lump_to_data_region(lump_id, data))?;
}
LumpDataInner::DataRegionUnaligned(data) => {
let mut aligned_data = DataRegionLumpData::new(data.len(), self.header.block_size);
aligned_data.as_bytes_mut().copy_from_slice(data);
track!(self.put_lump_to_data_region(lump_id, &aligned_data))?;
}
}
self.metrics.put_lumps_at_running.increment();
Ok(!updated)
}
pub fn delete(&mut self, lump_id: &LumpId) -> Result<bool> {
track!(self.delete_if_exists(lump_id, true))
}
pub fn delete_range(&mut self, range: Range<LumpId>) -> Result<Vec<LumpId>> {
let targets = self.lump_index.list_range(range.clone());
track!(self
.journal_region
.records_delete_range(&mut self.lump_index, range))?;
for lump_id in &targets {
if let Some(portion) = self.lump_index.remove(lump_id) {
self.metrics.delete_lumps.increment();
if let Portion::Data(portion) = portion {
self.data_region.delete(portion);
}
}
}
Ok(targets)
}
pub fn allocate_lump_data(&self, size: usize) -> Result<LumpData> {
track!(LumpData::aligned_allocate(size, self.header.block_size))
}
pub fn allocate_lump_data_with_bytes(&self, bytes: &[u8]) -> Result<LumpData> {
let mut data = track!(self.allocate_lump_data(bytes.len()))?;
data.as_bytes_mut().copy_from_slice(bytes);
Ok(data)
}
pub fn run_side_job_once(&mut self) -> Result<()> {
track!(self.journal_region.run_side_job_once(&mut self.lump_index))?;
Ok(())
}
pub fn journal_sync(&mut self) -> Result<()> {
self.journal_region.sync()
}
pub fn journal_gc(&mut self) -> Result<()> {
self.journal_region.gc_all_entries(&mut self.lump_index)
}
pub fn journal_snapshot(&mut self) -> Result<JournalSnapshot> {
let (unreleased_head, head, tail, entries) = track!(self.journal_region.journal_entries())?;
Ok(JournalSnapshot {
unreleased_head,
head,
tail,
entries,
})
}
#[allow(dead_code)]
pub(crate) fn set_automatic_gc_mode(&mut self, enable: bool) {
self.journal_region.set_automatic_gc_mode(enable);
}
fn put_lump_to_data_region(
&mut self,
lump_id: &LumpId,
data: &DataRegionLumpData,
) -> Result<()> {
let portion = track!(self.data_region.put(data))?;
track!(self
.journal_region
.records_put(&mut self.lump_index, lump_id, portion)
.map_err(|e| {
self.data_region.delete(portion);
e
}))?;
self.lump_index.insert(*lump_id, Portion::Data(portion));
Ok(())
}
fn delete_if_exists(&mut self, lump_id: &LumpId, do_record: bool) -> Result<bool> {
if let Some(portion) = self.lump_index.remove(lump_id) {
self.metrics.delete_lumps.increment();
if do_record {
track!(self
.journal_region
.records_delete(&mut self.lump_index, lump_id,))?;
}
if let Portion::Data(portion) = portion {
self.data_region.delete(portion);
}
Ok(true)
} else {
Ok(false)
}
}
}
#[derive(Debug, Clone)]
pub enum StorageUsage {
Unknown,
Approximate(u64),
}
impl StorageUsage {
pub fn approximate(usage: u64) -> Self {
StorageUsage::Approximate(usage)
}
pub fn unknown() -> Self {
StorageUsage::Unknown
}
pub fn bytecount(&self) -> Option<u64> {
match *self {
StorageUsage::Unknown => None,
StorageUsage::Approximate(bytes) => Some(bytes),
}
}
}
impl Default for StorageUsage {
fn default() -> Self {
StorageUsage::Unknown
}
}
#[cfg(test)]
mod tests {
use std::fs::OpenOptions;
use std::mem;
use tempdir::TempDir;
use trackable::result::TestResult;
use super::*;
use block::BlockSize;
use lump::{LumpData, LumpId};
use nvm::{FileNvm, SharedMemoryNvm};
use ErrorKind;
#[test]
fn it_works() -> TestResult {
let dir = track_io!(TempDir::new("cannyls_test"))?;
let nvm = track!(FileNvm::create(
dir.path().join("test.lusf"),
BlockSize::min().ceil_align(1024 * 1024)
))?;
let mut storage = track!(Storage::create(nvm))?;
assert!(storage.get(&id("000"))?.is_none());
assert!(storage.put(&id("000"), &data("hello"))?);
assert!(!storage.put(&id("000"), &data("hello"))?);
assert_eq!(storage.get(&id("000"))?, Some(data("hello")));
assert_eq!(
storage.head(&id("000")).map(|h| h.approximate_data_size),
Some(5)
);
assert!(storage.delete(&id("000"))?);
assert!(!storage.delete(&id("000"))?);
assert!(storage.get(&id("000"))?.is_none());
assert!(storage.head(&id("000")).is_none());
assert!(storage.put(&id("000"), &data("hello"))?);
assert!(storage.put(&id("111"), &data("world"))?);
for _ in 0..10 {
track!(storage.run_side_job_once())?;
assert!(storage.put(&id("222"), &data("quux"))?);
assert!(storage.delete(&id("222"))?);
}
mem::drop(storage);
let nvm = track!(FileNvm::open(dir.path().join("test.lusf")))?;
let storage = track!(Storage::open(nvm))?;
assert_eq!(storage.list(), vec![id("000"), id("111")]);
Ok(())
}
#[test]
fn full() -> TestResult {
let dir = track_io!(TempDir::new("cannyls_test"))?;
let nvm = track!(FileNvm::create(
dir.path().join("test.lusf"),
BlockSize::min().ceil_align(1024 * 1024)
))?;
let mut storage = track!(Storage::create(nvm))?;
assert_eq!(
track!(storage.put(&id("000"), &zeroed_data(512 * 1024)))?,
true
);
assert_eq!(
storage.put(&id("000"), &zeroed_data(512 * 1024)).ok(),
Some(false)
);
assert_eq!(
storage
.put(&id("111"), &zeroed_data(512 * 1024))
.err()
.map(|e| *e.kind()),
Some(ErrorKind::StorageFull)
);
assert_eq!(storage.delete(&id("000")).ok(), Some(true));
assert_eq!(
storage.put(&id("111"), &zeroed_data(512 * 1024)).ok(),
Some(true)
);
Ok(())
}
#[test]
fn max_size_lump() -> TestResult {
let dir = track_io!(TempDir::new("cannyls_test"))?;
let nvm = track!(FileNvm::create(
dir.path().join("test.lusf"),
BlockSize::min().ceil_align(100 * 1024 * 1024)
))?;
let mut storage = track!(Storage::create(nvm))?;
let data = zeroed_data(LumpData::MAX_SIZE);
assert_eq!(track!(storage.put(&id("000"), &data))?, true);
assert_eq!(track!(storage.get(&id("000")))?, Some(data));
Ok(())
}
fn id(id: &str) -> LumpId {
id.parse().unwrap()
}
fn data(data: &str) -> LumpData {
LumpData::new_embedded(Vec::from(data)).unwrap()
}
fn zeroed_data(size: usize) -> LumpData {
let mut data = LumpData::aligned_allocate(size, BlockSize::min()).unwrap();
for v in data.as_bytes_mut() {
*v = 0;
}
data
}
#[test]
fn open_older_compatible_version_works() -> TestResult {
let dir = track_io!(TempDir::new("cannyls_test"))?;
let path = dir.path().join("test.lusf");
let mut header = {
let nvm = track!(FileNvm::create(&path, 1024 * 1024))?;
let storage = track!(Storage::create(nvm))?;
let header = storage.header().clone();
assert_eq!(header.major_version, MAJOR_VERSION);
assert_eq!(header.minor_version, MINOR_VERSION);
header
};
{
header.minor_version = header
.minor_version
.checked_sub(1)
.expect("このテストは`MINOR_VERSION >= 1`であることを前提としている");
let file = track_any_err!(OpenOptions::new().write(true).open(&path))?;
track!(header.write_to(file))?;
}
{
let nvm = track!(FileNvm::open(&path))?;
let storage = track!(Storage::open(nvm))?;
let header = storage.header().clone();
assert_eq!(header.major_version, MAJOR_VERSION);
assert_eq!(header.minor_version, MINOR_VERSION);
}
{
let file = track_any_err!(OpenOptions::new().read(true).open(&path))?;
let header = track!(StorageHeader::read_from(file))?;
assert_eq!(header.major_version, MAJOR_VERSION);
assert_eq!(header.minor_version, MINOR_VERSION);
}
Ok(())
}
#[test]
fn block_size_check_when_create() -> TestResult {
let nvm_block_size = track!(BlockSize::new(1024))?;
let storage_block_size = track!(BlockSize::new(1024))?;
let storage = track!(StorageBuilder::new()
.block_size(storage_block_size)
.create(memory_nvm(nvm_block_size)))?;
assert_eq!(storage.header().block_size, storage_block_size);
let nvm_block_size = track!(BlockSize::new(512))?;
let storage_block_size = track!(BlockSize::new(1024))?;
let storage = track!(StorageBuilder::new()
.block_size(storage_block_size)
.create(memory_nvm(nvm_block_size)))?;
assert_eq!(storage.header().block_size, storage_block_size);
let nvm_block_size = track!(BlockSize::new(1024))?;
let storage_block_size = track!(BlockSize::new(512))?;
assert!(StorageBuilder::new()
.block_size(storage_block_size)
.create(memory_nvm(nvm_block_size))
.is_err());
let nvm_block_size = track!(BlockSize::new(1024))?;
let storage_block_size = track!(BlockSize::new(1536))?;
assert!(StorageBuilder::new()
.block_size(storage_block_size)
.create(memory_nvm(nvm_block_size))
.is_err());
Ok(())
}
#[test]
fn block_size_check_when_open() -> TestResult {
let initial_nvm_block_size = track!(BlockSize::new(1536))?;
let storage_block_size = track!(BlockSize::new(1536))?;
let mut nvm = memory_nvm(initial_nvm_block_size);
assert!(StorageBuilder::new()
.block_size(storage_block_size)
.create(nvm.clone())
.is_ok());
let storage = track!(Storage::open(nvm.clone()))?;
assert_eq!(storage.header().block_size, storage_block_size);
nvm.set_block_size(track!(BlockSize::new(512))?);
let storage = track!(Storage::open(nvm.clone()))?;
assert_eq!(storage.header().block_size, storage_block_size);
nvm.set_block_size(track!(BlockSize::new(2048))?);
assert!(Storage::open(nvm.clone()).is_err());
nvm.set_block_size(track!(BlockSize::new(1024))?);
assert!(Storage::open(nvm).is_err());
Ok(())
}
fn memory_nvm(block_size: BlockSize) -> SharedMemoryNvm {
SharedMemoryNvm::with_block_size(vec![0; 1024 * 1024], block_size)
}
fn is_put_with(entry: &JournalEntry, id: &LumpId) -> bool {
if let JournalRecord::Put(id_, _) = entry.record {
id_ == *id
} else {
false
}
}
fn is_delete_with(entry: &JournalEntry, id: &LumpId) -> bool {
if let JournalRecord::Delete(id_) = entry.record {
id_ == *id
} else {
false
}
}
#[test]
fn full_gc_works() -> TestResult {
let dir = track_io!(TempDir::new("cannyls_test"))?;
let nvm = track!(FileNvm::create(
dir.path().join("test.lusf"),
BlockSize::min().ceil_align(1024 * 1024)
))?;
let mut storage = track!(Storage::create(nvm))?;
storage.set_automatic_gc_mode(false);
assert!(storage.put(&id("000"), &zeroed_data(42))?);
assert!(storage.put(&id("010"), &zeroed_data(42))?);
let entries = storage.journal_snapshot().unwrap().entries;
assert_eq!(entries.len(), 2);
assert!(is_put_with(entries.get(0).unwrap(), &id("000")));
assert!(is_put_with(entries.get(1).unwrap(), &id("010")));
storage.journal_gc().unwrap();
let new_entries = storage.journal_snapshot().unwrap().entries;
for (e1, e2) in entries.iter().zip(new_entries.iter()) {
assert_eq!(e1.record, e2.record);
}
assert!(storage.delete(&id("000"))?);
assert!(storage.delete(&id("010"))?);
let entries = storage.journal_snapshot().unwrap().entries;
assert_eq!(entries.len(), 4);
assert!(is_put_with(entries.get(0).unwrap(), &id("000")));
assert!(is_put_with(entries.get(1).unwrap(), &id("010")));
assert!(is_delete_with(entries.get(2).unwrap(), &id("000")));
assert!(is_delete_with(entries.get(3).unwrap(), &id("010")));
storage.journal_gc().unwrap();
let entries = storage.journal_snapshot().unwrap().entries;
assert_eq!(entries.len(), 0);
Ok(())
}
#[test]
fn journal_overflow_example() -> TestResult {
let dir = track_io!(TempDir::new("cannyls_test"))?;
let nvm = track!(FileNvm::create(
dir.path().join("test.lusf"),
BlockSize::min().ceil_align(1024 * 400)
))?;
let mut storage = track!(StorageBuilder::new().journal_region_ratio(0.01).create(nvm))?;
storage.set_automatic_gc_mode(false);
{
let header = storage.header();
assert_eq!(header.journal_region_size, 4096);
}
for i in 0..60 {
assert!(storage.put(&id(&i.to_string()), &zeroed_data(42))?);
}
for i in 0..20 {
assert!(storage.delete(&id(&i.to_string()))?);
}
{
let snapshot = track!(storage.journal_snapshot())?;
assert_eq!(snapshot.unreleased_head, 0);
assert_eq!(snapshot.head, 0);
assert_eq!(snapshot.tail, 2100);
}
track!(storage.journal_gc())?;
{
let snapshot = track!(storage.journal_snapshot())?;
assert_eq!(snapshot.unreleased_head, 2100);
assert_eq!(snapshot.head, 2100);
assert_eq!(snapshot.tail, 3220);
}
track!(storage.journal_gc())?;
{
let snapshot = track!(storage.journal_snapshot())?;
assert_eq!(snapshot.unreleased_head, 3220);
assert_eq!(snapshot.head, 3220);
assert_eq!(snapshot.tail, 784);
}
Ok(())
}
#[test]
fn confirm_that_the_problem_of_pr23_is_resolved() -> TestResult {
let dir = track_io!(TempDir::new("cannyls_test"))?;
let nvm = track!(FileNvm::create(
dir.path().join("test.lusf"),
BlockSize::min().ceil_align(1024 * 100 * 4)
))?;
let mut storage = track!(StorageBuilder::new().journal_region_ratio(0.01).create(nvm))?;
assert_eq!(storage.header().journal_region_size, 4096);
storage.set_automatic_gc_mode(false);
let test_lump_id = id("55");
let vec: Vec<u8> = vec![42; 10];
let lump_data = track!(LumpData::new_embedded(vec))?;
track!(storage.put(&test_lump_id, &lump_data))?;
track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; {
let snapshot = storage.journal_snapshot().unwrap();
assert_eq!(snapshot.unreleased_head, 33);
assert_eq!(snapshot.head, 66);
assert_eq!(snapshot.tail, 66);
}
std::mem::drop(storage);
let nvm = track!(FileNvm::open(dir.path().join("test.lusf")))?;
let mut storage = track!(Storage::open(nvm))?;
storage.set_automatic_gc_mode(false);
{
let snapshot = storage.journal_snapshot().unwrap();
assert_eq!(snapshot.unreleased_head, 33);
assert_eq!(snapshot.head, 33); assert_eq!(snapshot.tail, 66);
}
for _ in 0..3 {
let vec: Vec<u8> = vec![42; 1000];
let lump_data = track!(LumpData::new_embedded(vec))?;
track!(storage.put(&test_lump_id, &lump_data))?;
track!(storage.delete(&test_lump_id))?;
}
track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; track!(storage.run_side_job_once())?; {
let snapshot = storage.journal_snapshot().unwrap();
assert_eq!(snapshot.unreleased_head, 3198);
assert_eq!(snapshot.head, 3198);
assert_eq!(snapshot.tail, 3198);
}
let vec: Vec<u8> = vec![42; 2000];
let lump_data = track!(LumpData::new_embedded(vec))?;
track!(storage.put(&test_lump_id, &lump_data))?;
{
let snapshot = storage.journal_snapshot().unwrap();
assert_eq!(snapshot.unreleased_head, 3198);
assert_eq!(snapshot.head, 3198);
assert_eq!(snapshot.tail, 2023);
}
std::mem::drop(storage);
let nvm = track!(FileNvm::open(dir.path().join("test.lusf")))?;
let mut storage = track!(Storage::open(nvm))?;
{
let snapshot = storage.journal_snapshot().unwrap();
assert_eq!(snapshot.unreleased_head, 3198);
assert_eq!(snapshot.head, 3198);
assert_eq!(snapshot.tail, 2023);
}
Ok(())
}
}