use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::sync::Arc;
use fail::fail_point;
use log::warn;
use crate::env::{FileSystem, Handle, WriteExt};
use crate::metrics::*;
use crate::pipe_log::FileBlockHandle;
use crate::{Error, Result};
use super::format::LogFileFormat;
const FILE_ALLOCATE_SIZE: usize = 2 * 1024 * 1024;
pub(super) fn build_file_writer<F: FileSystem>(
system: &F,
handle: Arc<F::Handle>,
format: LogFileFormat,
force_reset: bool,
) -> Result<LogFileWriter<F>> {
let writer = system.new_writer(handle.clone())?;
LogFileWriter::open(handle, writer, format, force_reset)
}
pub struct LogFileWriter<F: FileSystem> {
handle: Arc<F::Handle>,
writer: F::Writer,
written: usize,
capacity: usize,
}
impl<F: FileSystem> LogFileWriter<F> {
fn open(
handle: Arc<F::Handle>,
writer: F::Writer,
format: LogFileFormat,
force_reset: bool,
) -> Result<Self> {
let file_size = handle.file_size()?;
let mut f = Self {
handle,
writer,
written: file_size,
capacity: file_size,
};
if file_size < LogFileFormat::encoded_len(format.version) || force_reset {
f.write_header(format)?;
} else {
f.writer.seek(SeekFrom::Start(file_size as u64))?;
}
Ok(f)
}
fn write_header(&mut self, format: LogFileFormat) -> IoResult<()> {
self.writer.rewind()?;
self.written = 0;
let mut buf = Vec::with_capacity(LogFileFormat::encoded_len(format.version));
format.encode(&mut buf).unwrap();
self.write(&buf, 0)
}
pub fn close(&mut self) -> IoResult<()> {
self.truncate()?;
self.sync()
}
pub fn truncate(&mut self) -> IoResult<()> {
if self.written < self.capacity {
fail_point!("file_pipe_log::log_file_writer::skip_truncate", |_| {
Ok(())
});
self.writer.truncate(self.written)?;
self.capacity = self.written;
}
Ok(())
}
pub fn write(&mut self, buf: &[u8], target_size_hint: usize) -> IoResult<()> {
let new_written = self.written + buf.len();
if self.capacity < new_written {
let _t = StopWatch::new(&*LOG_ALLOCATE_DURATION_HISTOGRAM);
let alloc = std::cmp::min(
FILE_ALLOCATE_SIZE,
target_size_hint.saturating_sub(self.capacity),
);
let alloc = std::cmp::max(new_written - self.capacity, alloc);
if let Err(e) = self.writer.allocate(self.capacity, alloc) {
warn!("log file allocation failed: {e}");
}
self.capacity += alloc;
}
self.writer.write_all(buf).map_err(|e| {
self.writer
.seek(SeekFrom::Start(self.written as u64))
.unwrap_or_else(|e| {
panic!("failed to reseek after write failure: {}", e);
});
e
})?;
self.written = new_written;
Ok(())
}
pub fn sync(&mut self) -> IoResult<()> {
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.handle.sync().unwrap();
Ok(())
}
#[inline]
pub fn offset(&self) -> usize {
self.written
}
}
pub(super) fn build_file_reader<F: FileSystem>(
system: &F,
handle: Arc<F::Handle>,
) -> Result<LogFileReader<F>> {
let reader = system.new_reader(handle.clone())?;
Ok(LogFileReader::open(handle, reader))
}
pub struct LogFileReader<F: FileSystem> {
handle: Arc<F::Handle>,
reader: F::Reader,
offset: u64,
}
impl<F: FileSystem> LogFileReader<F> {
fn open(handle: Arc<F::Handle>, reader: F::Reader) -> LogFileReader<F> {
Self {
handle,
reader,
offset: u64::MAX,
}
}
pub fn parse_format(&mut self) -> Result<LogFileFormat> {
let mut container = vec![0; LogFileFormat::max_encoded_len()];
let size = self.read_to(0, &mut container)?;
container.truncate(size);
LogFileFormat::decode(&mut container.as_slice())
}
pub fn read(&mut self, handle: FileBlockHandle) -> Result<Vec<u8>> {
let mut buf = vec![0; handle.len];
let size = self.read_to(handle.offset, &mut buf)?;
buf.truncate(size);
Ok(buf)
}
pub fn read_to(&mut self, offset: u64, mut buf: &mut [u8]) -> Result<usize> {
if offset != self.offset {
self.reader.seek(SeekFrom::Start(offset))?;
self.offset = offset;
}
loop {
match self.reader.read(buf) {
Ok(0) => break,
Ok(n) => {
self.offset += n as u64;
buf = &mut buf[n..];
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(Error::Io(e)),
}
}
Ok((self.offset - offset) as usize)
}
#[inline]
pub fn file_size(&self) -> Result<usize> {
Ok(self.handle.file_size()?)
}
}