use std::io::{self, Seek, SeekFrom, Write};
use std::mem;
use super::BlockCompression;
#[cfg(feature = "deflate")]
use flate2::write::DeflateEncoder;
const MAX_DATA_BLOCK_SIZE: u64 = 64 * 1024;
enum Writer<S: Write> {
Raw(S),
#[cfg(feature = "deflate")]
Deflate(DeflateEncoder<S>),
#[cfg(feature = "lz4")]
Lz4(Box<lz4_flex::frame::FrameEncoder<S>>),
}
impl<W: Write> Writer<W> {
fn into_stream(self) -> io::Result<W> {
match self {
Writer::Raw(r) => Ok(r),
#[cfg(feature = "deflate")]
Writer::Deflate(d) => d.finish(),
#[cfg(feature = "lz4")]
Writer::Lz4(l) => l.finish().map_err(super::map_lz4_err),
}
}
fn get_stream(&mut self) -> &mut dyn Write {
match self {
Writer::Raw(r) => r,
#[cfg(feature = "deflate")]
Writer::Deflate(d) => d,
#[cfg(feature = "lz4")]
Writer::Lz4(l) => l,
}
}
}
enum BlockState<S: Write> {
Invalid,
Wait(S),
Active {
writer: Writer<S>,
block_id: u64,
offset: u64,
},
}
pub(crate) struct DataBlocksWriter<S: Write> {
state: BlockState<S>,
compression: BlockCompression,
}
impl<S: Write + Seek> DataBlocksWriter<S> {
pub(crate) fn new(stream: S, compression: BlockCompression) -> Self {
DataBlocksWriter {
state: BlockState::Wait(stream),
compression,
}
}
fn close_current(&mut self) -> io::Result<()> {
let (mut stream, block_id) = match mem::replace(&mut self.state, BlockState::Invalid) {
BlockState::Wait(stream) => (stream, !0),
BlockState::Active {
writer, block_id, ..
} => (writer.into_stream()?, block_id),
BlockState::Invalid => unreachable!(),
};
let current_position = stream.stream_position()?;
let len = current_position - (block_id + 1 + 4);
if len > 0 {
let len_bytes = u32::try_from(len)
.map_err(|_| {
io::Error::new(io::ErrorKind::Other, "block size can't be written as u32")
})?
.to_be_bytes();
stream.seek(SeekFrom::Start(block_id + 1))?;
stream.write_all(&len_bytes)?;
stream.seek(SeekFrom::Start(current_position))?;
}
self.state = BlockState::Wait(stream);
Ok(())
}
pub(crate) fn fragment(&mut self, size_hint: u64) -> io::Result<Fragment<impl Write + '_>> {
let current_offset = match &self.state {
BlockState::Active { offset, .. } => *offset,
_ => 0,
};
if size_hint == u64::MAX
|| (current_offset + size_hint > MAX_DATA_BLOCK_SIZE && current_offset > 0)
{
self.close_current()?;
}
if let BlockState::Wait(_) = self.state {
match mem::replace(&mut self.state, BlockState::Invalid) {
BlockState::Wait(mut stream) => {
let block_id = stream.stream_position()?;
stream.write_all(&[self.compression.tag() as u8, 0, 0, 0, 0])?;
let writer = match self.compression {
BlockCompression::None => Writer::Raw(stream),
#[cfg(feature = "deflate")]
BlockCompression::Deflate(level) => {
let encoder =
DeflateEncoder::new(stream, flate2::Compression::new(level));
Writer::Deflate(encoder)
}
#[cfg(feature = "lz4")]
BlockCompression::Lz4 => {
let encoder = lz4_flex::frame::FrameEncoder::new(stream);
Writer::Lz4(Box::new(encoder))
}
};
self.state = BlockState::Active {
writer,
block_id,
offset: 0,
};
}
_ => unreachable!(),
}
}
match &mut self.state {
BlockState::Active {
writer,
block_id,
offset,
} => {
let offset_copy = *offset;
let fragment = Fragment {
writer: writer.get_stream(),
writer_offset: offset,
block_id: *block_id,
offset: offset_copy,
};
Ok(fragment)
}
_ => unreachable!(),
}
}
pub(crate) fn finish(mut self) -> io::Result<S> {
self.close_current()?;
match self.state {
BlockState::Wait(stream) => Ok(stream),
_ => unreachable!(),
}
}
}
pub(crate) struct Fragment<'a, S> {
writer: S,
writer_offset: &'a mut u64,
block_id: u64,
offset: u64,
}
pub(crate) struct FragmentLocation {
pub(crate) block_id: u64,
pub(crate) offset: u64,
}
impl<S> Fragment<'_, S> {
pub(crate) fn location(self) -> FragmentLocation {
FragmentLocation {
block_id: self.block_id,
offset: self.offset,
}
}
}
impl<S: Write> Write for Fragment<'_, S> {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
let n = self.writer.write(buf)?;
*self.writer_offset += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), io::Error> {
self.writer.flush()
}
}