use std::fs::File;
use std::path::{Path, PathBuf};
use std::collections::BTreeMap;
use std::io::{Read, Seek, SeekFrom};
use std::ops::Bound::*;
use std::cmp::min;
use std::marker::PhantomData;
use crate::Error;
use crate::header;
use crate::record;
use crate::Store;
use crate::{ReadOnly, Writable, WriteOpenMode};
pub(crate) struct StoreBase {
path: PathBuf,
file: File,
spans: BTreeMap<u64, Span>,
file_size: u64,
}
impl StoreBase {
pub fn size(&self) -> u64 {
self.spans
.last_key_value()
.map(|(off, span)| off + span.len)
.unwrap_or(0)
}
}
pub(crate) struct Span {
pub len: u64,
pub file_data_offset: u64,
pub validated: bool,
}
fn read_newfile(base: &mut StoreBase, compatible: fn(&header::HeaderVer) -> bool) -> Result<(), Error>
{
let hver = header::read_header(&mut base.file, &mut base.file_size)?;
if !compatible(&hver) {
return Err(Error::UnsupportedVersion);
}
while let Some(record) = record::read_next_record(&mut base.file, &mut base.file_size)? {
record::add_record(&mut base.spans,
record.hdr.logical_offset,
record.hdr.length,
record.file_data_offset, true);
}
Ok(())
}
pub fn open_readonly<P: AsRef<Path>>(
path: P,
) -> Result<Store<ReadOnly>, Error> {
let path = path.as_ref().to_path_buf();
let mut oo = std::fs::OpenOptions::new();
oo.read(true);
let file = oo.open(&path)?;
let mut base = StoreBase {
path: path,
file: file,
spans: BTreeMap::new(),
file_size: 0,
};
read_newfile(&mut base, header::HeaderVer::is_read_compatible)?;
Ok(Store {base, writable: false, _mode: PhantomData })
}
pub(crate) fn open_writable_base<P: AsRef<Path>>(
path: P,
mode: WriteOpenMode,
) -> Result<StoreBase, Error> {
let path = path.as_ref().to_path_buf();
let mut oo = std::fs::OpenOptions::new();
oo.read(true);
oo.write(true);
match mode {
WriteOpenMode::MustExist => { oo.create(false); }
WriteOpenMode::MustNotExist => { oo.create_new(true); }
WriteOpenMode::MayExist => { oo.create(true); }
}
let file = oo.open(&path)?;
let mut base = StoreBase {
path: path,
file: file,
spans: BTreeMap::new(),
file_size: 0,
};
if base.file.metadata()?.len() == 0 {
base.file_size = header::write_header(&mut base.file)?;
base.file.sync_all()?;
} else {
read_newfile(&mut base, header::HeaderVer::is_write_compatible)?;
}
Ok(base)
}
pub fn open<P: AsRef<Path>>(
path: P,
mode: WriteOpenMode,
) -> Result<Store<Writable>, Error> {
Ok(Store {base: open_writable_base::<P>(path, mode)?,
writable: true,
_mode: PhantomData})
}
fn validate_record_with_retry(
file: &mut File,
file_data_offset: u64,
length: u64,
) -> Result<(), Error> {
if record::validate(file, file_data_offset, length as usize)? {
return Ok(());
}
file.sync_data()?;
if record::validate(file, file_data_offset, length as usize)? {
return Ok(());
}
Err(Error::CorruptRecord)
}
impl<M> Store<M>
{
pub fn size(&self) -> u64 {
self.base.size()
}
fn prev_offset(&self, offset: u64) -> u64 {
self.base.spans
.range((Included(0), Excluded(offset)))
.next_back()
.map(|(&off, _)| off)
.unwrap_or(0)
}
fn validate_range(&mut self, start: u64, end: u64) -> Result<(), Error> {
if !self.writable {
return Ok(());
}
let to_validate: Vec<(u64, u64, u64)> = self.base.spans
.range((Included(start), Excluded(end)))
.filter_map(|(&off, span)| {
if span.validated {
None
} else {
Some((off, span.file_data_offset, span.len))
}
})
.collect();
for &(_, file_data_offset, length) in &to_validate {
validate_record_with_retry(&mut self.base.file, file_data_offset, length)?;
}
for &(off, _, _) in &to_validate {
let span = self.base.spans.get_mut(&off).unwrap();
span.validated = true;
}
Ok(())
}
pub fn read(&mut self, mut offset: u64, mut buf: &mut [u8]) -> Result<(), Error> {
buf.fill(0);
let prev = self.prev_offset(offset);
self.validate_range(prev, offset + buf.len() as u64)?;
if let Some(span) = self.base.spans.get(&prev) {
if prev + span.len > offset {
let bytes_before = offset - prev;
let len = min(span.len - bytes_before, buf.len() as u64);
self.base.file.seek(SeekFrom::Start(span.file_data_offset + bytes_before))?;
self.base.file.read_exact(&mut buf[..len as usize])?;
offset += len;
buf = &mut buf[len as usize..];
}
}
for (&off, span) in self.base.spans.range((Included(offset), Excluded(offset + buf.len() as u64))) {
let bytes_until_span = off - offset;
if bytes_until_span != 0 {
offset += bytes_until_span;
buf = &mut buf[bytes_until_span as usize..];
}
let len = min(span.len, buf.len() as u64);
self.base.file.seek(SeekFrom::Start(span.file_data_offset))?;
self.base.file.read_exact(&mut buf[..len as usize])?;
offset += len;
buf = &mut buf[len as usize..];
}
Ok(())
}
}
fn compact(base: &mut StoreBase) -> Result<StoreBase, Error> {
let path = base.path.clone();
let tmp = path.with_extension("compact");
let mut oo = std::fs::OpenOptions::new();
oo.write(true);
oo.create(true);
oo.truncate(true);
let mut file = oo.open(&tmp)?;
let mut file_len = header::write_header(&mut file)?;
let mut data = vec![0u8; base.size() as usize];
for (off, span) in &base.spans {
base.file.seek(SeekFrom::Start(span.file_data_offset))?;
base.file.read_exact(&mut data[*off as usize..(*off + span.len) as usize])?;
}
record::write_record(&mut file, 0, &data, &mut file_len)?;
file.sync_data()?;
std::fs::rename(&tmp, &path)?;
let parent = path.parent().unwrap();
let dir = File::open(parent)?;
dir.sync_all()?;
open_writable_base(&path, WriteOpenMode::MustExist)
}
impl Store<Writable> {
pub fn write(&mut self, mut offset: u64, mut buf: &[u8]) -> Result<(), Error> {
self.validate_range(self.prev_offset(offset), offset + buf.len() as u64)?;
while !buf.is_empty() {
let chunk = &buf[..min(buf.len(), record::MAX_RECORD_SIZE)];
let data_off = record::write_record(&mut self.base.file, offset, chunk, &mut self.base.file_size)?;
record::add_record(&mut self.base.spans, offset, chunk.len() as u64, data_off, false);
buf = &buf[chunk.len()..];
offset += chunk.len() as u64;
}
if self.base.file_size > 1_000_000 && self.base.file_size * 100 > self.size() {
self.validate_range(0, self.size())?;
self.base = compact(&mut self.base)?;
}
Ok(())
}
pub fn into_readonly(mut self) -> Result<Store<ReadOnly>, Error> {
self.validate_range(0, self.size())?;
Ok(Store {
base: self.base,
writable: false,
_mode: PhantomData,
})
}
}
#[cfg(test)]
#[test]
fn empty_store_size_zero() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("s");
let store = open(&path, WriteOpenMode::MayExist).unwrap();
assert_eq!(store.size(), 0);
}
#[test]
fn write_then_read() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("s");
let mut store = open(&path, WriteOpenMode::MayExist).unwrap();
store.write(0, b"hello").unwrap();
assert_eq!(store.size(), 5);
let mut buf = [0u8; 5];
store.read(0, &mut buf).unwrap();
assert_eq!(&buf, b"hello");
}
#[test]
fn overwrite_middle() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("s");
let mut store = open(&path, WriteOpenMode::MayExist).unwrap();
store.write(0, b"abcdefgh").unwrap();
store.write(2, b"XYZ").unwrap();
let mut buf = [0u8; 8];
store.read(0, &mut buf).unwrap();
assert_eq!(&buf, b"abXYZfgh");
}
#[test]
fn holes_are_zero() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("s");
let mut store = open(&path, WriteOpenMode::MayExist).unwrap();
store.write(10, b"hi").unwrap();
let mut buf = [0u8; 12];
store.read(0, &mut buf).unwrap();
assert_eq!(&buf[..10], &[0u8; 10]);
assert_eq!(&buf[10..12], b"hi");
}
#[test]
fn replay_reconstructs_state() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("s");
{
let mut store = open(&path, WriteOpenMode::MayExist).unwrap();
store.write(0, b"abc").unwrap();
store.write(5, b"xyz").unwrap();
}
let mut store = open(&path, WriteOpenMode::MustExist).unwrap();
let mut buf = [0u8; 8];
store.read(0, &mut buf).unwrap();
assert_eq!(&buf, b"abc\0\0xyz");
}