use memmap;
use tempfile::tempfile;
use crate::core::ser::{
self, BinWriter, FixedLength, ProtocolVersion, Readable, Reader, StreamingReader, Writeable,
Writer,
};
use std::fmt::Debug;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom, Write};
use std::marker;
use std::path::{Path, PathBuf};
#[derive(Clone, Debug)]
pub struct SizeEntry {
pub offset: u64,
pub size: u16,
}
impl FixedLength for SizeEntry {
const LEN: usize = 8 + 2;
}
impl Readable for SizeEntry {
fn read(reader: &mut dyn Reader) -> Result<SizeEntry, ser::Error> {
Ok(SizeEntry {
offset: reader.read_u64()?,
size: reader.read_u16()?,
})
}
}
impl Writeable for SizeEntry {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u64(self.offset)?;
writer.write_u16(self.size)?;
Ok(())
}
}
pub enum SizeInfo {
FixedSize(u16),
VariableSize(Box<AppendOnlyFile<SizeEntry>>),
}
pub struct DataFile<T> {
file: AppendOnlyFile<T>,
}
impl<T> DataFile<T>
where
T: Readable + Writeable + Debug,
{
pub fn open<P>(
path: P,
size_info: SizeInfo,
version: ProtocolVersion,
) -> io::Result<DataFile<T>>
where
P: AsRef<Path> + Debug,
{
Ok(DataFile {
file: AppendOnlyFile::open(path, size_info, version)?,
})
}
pub fn append(&mut self, data: &T) -> io::Result<u64> {
self.file.append_elmt(data)?;
Ok(self.size_unsync())
}
pub fn read(&self, position: u64) -> Option<T> {
match self.file.read_as_elmt(position - 1) {
Ok(x) => Some(x),
Err(_) => None,
}
}
pub fn rewind(&mut self, position: u64) {
self.file.rewind(position)
}
pub fn flush(&mut self) -> io::Result<()> {
self.file.flush()
}
pub fn discard(&mut self) {
self.file.discard()
}
pub fn size(&self) -> u64 {
self.file.size_in_elmts().unwrap_or(0)
}
fn size_unsync(&self) -> u64 {
self.file.size_unsync_in_elmts().unwrap_or(0)
}
pub fn path(&self) -> &Path {
self.file.path()
}
pub fn as_temp_file(&self) -> io::Result<File> {
self.file.as_temp_file()
}
pub fn release(&mut self) {
self.file.release();
}
pub fn save_prune(&mut self, prune_pos: &[u64]) -> io::Result<()> {
let prune_idx: Vec<_> = prune_pos.into_iter().map(|x| x - 1).collect();
self.file.save_prune(prune_idx.as_slice())
}
}
pub struct AppendOnlyFile<T> {
path: PathBuf,
file: Option<File>,
size_info: SizeInfo,
version: ProtocolVersion,
mmap: Option<memmap::Mmap>,
buffer: Vec<u8>,
buffer_start_pos: u64,
buffer_start_pos_bak: u64,
_marker: marker::PhantomData<T>,
}
impl AppendOnlyFile<SizeEntry> {
fn sum_sizes(&self) -> io::Result<u64> {
let mut sum = 0;
for pos in 0..self.buffer_start_pos {
let entry = self.read_as_elmt(pos)?;
sum += entry.size as u64;
}
Ok(sum)
}
}
impl<T> AppendOnlyFile<T>
where
T: Debug + Readable + Writeable,
{
pub fn open<P>(
path: P,
size_info: SizeInfo,
version: ProtocolVersion,
) -> io::Result<AppendOnlyFile<T>>
where
P: AsRef<Path> + Debug,
{
let mut aof = AppendOnlyFile {
file: None,
path: path.as_ref().to_path_buf(),
size_info,
version,
mmap: None,
buffer: vec![],
buffer_start_pos: 0,
buffer_start_pos_bak: 0,
_marker: marker::PhantomData,
};
aof.init()?;
let expected_size = aof.size()?;
if let SizeInfo::VariableSize(ref mut size_file) = &mut aof.size_info {
if size_file.sum_sizes()? != expected_size {
aof.rebuild_size_file()?;
aof.init()?;
}
}
Ok(aof)
}
pub fn init(&mut self) -> io::Result<()> {
if let SizeInfo::VariableSize(ref mut size_file) = self.size_info {
size_file.init()?;
}
self.file = Some(
OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(self.path.clone())?,
);
if self.size()? == 0 {
self.buffer_start_pos = 0;
} else {
self.mmap = Some(unsafe { memmap::Mmap::map(&self.file.as_ref().unwrap())? });
self.buffer_start_pos = self.size_in_elmts()?;
}
Ok(())
}
fn size_in_elmts(&self) -> io::Result<u64> {
match self.size_info {
SizeInfo::FixedSize(elmt_size) => Ok(self.size()? / elmt_size as u64),
SizeInfo::VariableSize(ref size_file) => size_file.size_in_elmts(),
}
}
fn size_unsync_in_elmts(&self) -> io::Result<u64> {
match self.size_info {
SizeInfo::FixedSize(elmt_size) => {
Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64))
}
SizeInfo::VariableSize(ref size_file) => size_file.size_unsync_in_elmts(),
}
}
fn append_elmt(&mut self, data: &T) -> io::Result<()> {
let mut bytes = ser::ser_vec(data, self.version)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.append(&mut bytes)?;
Ok(())
}
pub fn append(&mut self, bytes: &mut [u8]) -> io::Result<()> {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
let next_pos = size_file.size_unsync_in_elmts()?;
let offset = if next_pos == 0 {
0
} else {
let prev_entry = size_file.read_as_elmt(next_pos.saturating_sub(1))?;
prev_entry.offset + prev_entry.size as u64
};
size_file.append_elmt(&SizeEntry {
offset,
size: bytes.len() as u16,
})?;
}
self.buffer.extend_from_slice(bytes);
Ok(())
}
fn offset_and_size(&self, pos: u64) -> io::Result<(u64, u16)> {
match self.size_info {
SizeInfo::FixedSize(elmt_size) => Ok((pos * elmt_size as u64, elmt_size)),
SizeInfo::VariableSize(ref size_file) => {
let entry = size_file.read_as_elmt(pos)?;
Ok((entry.offset, entry.size))
}
}
}
pub fn rewind(&mut self, pos: u64) {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.rewind(pos);
}
if self.buffer_start_pos_bak == 0 {
self.buffer_start_pos_bak = self.buffer_start_pos;
}
self.buffer_start_pos = pos;
}
pub fn flush(&mut self) -> io::Result<()> {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.flush()?
}
if self.buffer_start_pos_bak > 0 {
self.mmap = None;
self.file = None;
{
let file = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.open(&self.path)?;
if self.buffer_start_pos == 0 {
file.set_len(0)?;
} else {
let (offset, size) =
self.offset_and_size(self.buffer_start_pos.saturating_sub(1))?;
file.set_len(offset + size as u64)?;
};
}
}
{
let file = OpenOptions::new()
.read(true)
.create(true)
.append(true)
.open(&self.path)?;
self.file = Some(file);
self.buffer_start_pos_bak = 0;
}
self.file.as_mut().unwrap().write_all(&self.buffer[..])?;
self.file.as_mut().unwrap().sync_all()?;
self.buffer.clear();
self.buffer_start_pos = self.size_in_elmts()?;
if self.file.as_ref().unwrap().metadata()?.len() == 0 {
self.mmap = None;
} else {
self.mmap = Some(unsafe { memmap::Mmap::map(&self.file.as_ref().unwrap())? });
}
Ok(())
}
pub fn discard(&mut self) {
if self.buffer_start_pos_bak > 0 {
self.buffer_start_pos = self.buffer_start_pos_bak;
self.buffer_start_pos_bak = 0;
}
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.discard();
}
self.buffer = vec![];
}
pub fn read(&self, pos: u64) -> io::Result<&[u8]> {
if pos >= self.size_unsync_in_elmts()? {
return Ok(<&[u8]>::default());
}
let (offset, length) = self.offset_and_size(pos)?;
let res = if pos < self.buffer_start_pos {
self.read_from_mmap(offset, length)
} else {
let (buffer_offset, _) = self.offset_and_size(self.buffer_start_pos)?;
self.read_from_buffer(offset.saturating_sub(buffer_offset), length)
};
Ok(res)
}
fn read_as_elmt(&self, pos: u64) -> io::Result<T> {
let data = self.read(pos)?;
ser::deserialize(&mut &data[..], self.version)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn read_from_buffer(&self, offset: u64, length: u16) -> &[u8] {
if self.buffer.len() < (offset as usize + length as usize) {
<&[u8]>::default()
} else {
&self.buffer[(offset as usize)..(offset as usize + length as usize)]
}
}
fn read_from_mmap(&self, offset: u64, length: u16) -> &[u8] {
if let Some(mmap) = &self.mmap {
if mmap.len() < (offset as usize + length as usize) {
<&[u8]>::default()
} else {
&mmap[(offset as usize)..(offset as usize + length as usize)]
}
} else {
<&[u8]>::default()
}
}
pub fn as_temp_file(&self) -> io::Result<File> {
let mut reader = BufReader::new(File::open(&self.path)?);
let mut writer = BufWriter::new(tempfile()?);
io::copy(&mut reader, &mut writer)?;
writer.seek(SeekFrom::Start(0))?;
let file = writer.into_inner()?;
Ok(file)
}
pub fn save_prune(&mut self, prune_pos: &[u64]) -> io::Result<()> {
let tmp_path = self.path.with_extension("tmp");
{
let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
let mut current_pos = 0;
let mut prune_pos = prune_pos;
while let Ok(elmt) = T::read(&mut streaming_reader) {
if prune_pos.contains(¤t_pos) {
prune_pos = &prune_pos[1..];
} else {
elmt.write(&mut bin_writer)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
}
current_pos += 1;
}
buf_writer.flush()?;
}
self.replace(&tmp_path)?;
if let SizeInfo::VariableSize(_) = &self.size_info {
self.rebuild_size_file()?;
}
self.init()?;
Ok(())
}
fn rebuild_size_file(&mut self) -> io::Result<()> {
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
let tmp_path = size_file.path.with_extension("tmp");
debug!("rebuild_size_file: {:?}", tmp_path);
{
let reader = File::open(&self.path)?;
let mut buf_reader = BufReader::new(reader);
let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
let mut current_offset = 0;
while let Ok(_) = T::read(&mut streaming_reader) {
let size = streaming_reader
.total_bytes_read()
.saturating_sub(current_offset) as u16;
let entry = SizeEntry {
offset: current_offset,
size,
};
entry
.write(&mut bin_writer)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
current_offset += size as u64;
}
buf_writer.flush()?;
}
size_file.replace(&tmp_path)?;
}
Ok(())
}
fn replace<P>(&mut self, with: P) -> io::Result<()>
where
P: AsRef<Path> + Debug,
{
self.release();
fs::remove_file(&self.path)?;
fs::rename(with, &self.path)?;
Ok(())
}
pub fn release(&mut self) {
self.mmap = None;
self.file = None;
if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
size_file.release();
}
}
pub fn size(&self) -> io::Result<u64> {
fs::metadata(&self.path).map(|md| md.len())
}
pub fn path(&self) -> &Path {
&self.path
}
}