use super::{meta::Metadata, trailer::SegmentFileTrailer};
use crate::{coding::Encode, compression::Compressor, id::SegmentId, key_range::KeyRange, UserKey};
use byteorder::{BigEndian, WriteBytesExt};
use std::{
fs::File,
io::{BufWriter, Seek, Write},
path::{Path, PathBuf},
};
pub const BLOB_HEADER_MAGIC: &[u8] = &[b'V', b'L', b'G', b'B', b'L', b'O', b'B', 1];
pub struct Writer<C: Compressor + Clone> {
pub path: PathBuf,
pub(crate) segment_id: SegmentId,
#[allow(clippy::struct_field_names)]
active_writer: BufWriter<File>,
offset: u64,
pub(crate) item_count: u64,
pub(crate) written_blob_bytes: u64,
pub(crate) uncompressed_bytes: u64,
pub(crate) first_key: Option<UserKey>,
pub(crate) last_key: Option<UserKey>,
pub(crate) compression: Option<C>,
}
impl<C: Compressor + Clone> Writer<C> {
#[doc(hidden)]
pub fn new<P: AsRef<Path>>(path: P, segment_id: SegmentId) -> std::io::Result<Self> {
let path = path.as_ref();
let file = File::create(path)?;
Ok(Self {
path: path.into(),
segment_id,
active_writer: BufWriter::new(file),
offset: 0,
item_count: 0,
written_blob_bytes: 0,
uncompressed_bytes: 0,
first_key: None,
last_key: None,
compression: None,
})
}
pub fn use_compression(mut self, compressor: Option<C>) -> Self {
self.compression = compressor;
self
}
#[must_use]
pub(crate) fn offset(&self) -> u64 {
self.offset
}
#[must_use]
pub(crate) fn segment_id(&self) -> SegmentId {
self.segment_id
}
pub fn write(&mut self, key: &[u8], value: &[u8]) -> crate::Result<u32> {
assert!(!key.is_empty());
assert!(key.len() <= u16::MAX.into());
assert!(u32::try_from(value.len()).is_ok());
if self.first_key.is_none() {
self.first_key = Some(key.into());
}
self.last_key = Some(key.into());
self.uncompressed_bytes += value.len() as u64;
let value = match &self.compression {
Some(compressor) => compressor.compress(value)?,
None => value.to_vec(),
};
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
hasher.update(key);
hasher.update(&value);
let checksum = hasher.digest();
self.active_writer.write_all(BLOB_HEADER_MAGIC)?;
self.active_writer.write_u64::<BigEndian>(checksum)?;
#[allow(clippy::cast_possible_truncation)]
self.active_writer
.write_u16::<BigEndian>(key.len() as u16)?;
self.active_writer.write_all(key)?;
#[allow(clippy::cast_possible_truncation)]
self.active_writer
.write_u32::<BigEndian>(value.len() as u32)?;
self.active_writer.write_all(&value)?;
self.offset += BLOB_HEADER_MAGIC.len() as u64;
self.offset += std::mem::size_of::<u64>() as u64;
self.offset += std::mem::size_of::<u16>() as u64;
self.offset += key.len() as u64;
self.offset += std::mem::size_of::<u32>() as u64;
self.offset += value.len() as u64;
self.written_blob_bytes += value.len() as u64;
self.item_count += 1;
#[allow(clippy::cast_possible_truncation)]
Ok(value.len() as u32)
}
pub(crate) fn flush(&mut self) -> crate::Result<()> {
let metadata_ptr = self.active_writer.stream_position()?;
let metadata = Metadata {
item_count: self.item_count,
compressed_bytes: self.written_blob_bytes,
total_uncompressed_bytes: self.uncompressed_bytes,
key_range: KeyRange::new((
self.first_key
.clone()
.expect("should have written at least 1 item"),
self.last_key
.clone()
.expect("should have written at least 1 item"),
)),
};
metadata.encode_into(&mut self.active_writer)?;
SegmentFileTrailer {
metadata,
metadata_ptr,
}
.encode_into(&mut self.active_writer)?;
self.active_writer.flush()?;
self.active_writer.get_mut().sync_all()?;
Ok(())
}
}