value_log/segment/
multi_writer.rsuse super::writer::Writer;
use crate::{
compression::Compressor,
id::{IdGenerator, SegmentId},
ValueHandle,
};
use std::path::{Path, PathBuf};
pub struct MultiWriter<C: Compressor + Clone> {
folder: PathBuf,
target_size: u64,
writers: Vec<Writer<C>>,
id_generator: IdGenerator,
compression: Option<C>,
}
impl<C: Compressor + Clone> MultiWriter<C> {
#[doc(hidden)]
pub fn new<P: AsRef<Path>>(
id_generator: IdGenerator,
target_size: u64,
folder: P,
) -> std::io::Result<Self> {
let folder = folder.as_ref();
let segment_id = id_generator.next();
let segment_path = folder.join(segment_id.to_string());
Ok(Self {
id_generator,
folder: folder.into(),
target_size,
writers: vec![Writer::new(segment_path, segment_id)?],
compression: None,
})
}
#[must_use]
#[doc(hidden)]
pub fn use_compression(mut self, compressor: C) -> Self {
self.compression = Some(compressor.clone());
self.get_active_writer_mut().compression = Some(compressor);
self
}
#[doc(hidden)]
#[must_use]
pub fn get_active_writer(&self) -> &Writer<C> {
#[allow(clippy::expect_used)]
self.writers.last().expect("should exist")
}
fn get_active_writer_mut(&mut self) -> &mut Writer<C> {
#[allow(clippy::expect_used)]
self.writers.last_mut().expect("should exist")
}
#[must_use]
pub fn get_next_value_handle(&self) -> ValueHandle {
ValueHandle {
offset: self.offset(),
segment_id: self.segment_id(),
}
}
#[doc(hidden)]
#[must_use]
pub fn offset(&self) -> u64 {
self.get_active_writer().offset()
}
#[must_use]
fn segment_id(&self) -> SegmentId {
self.get_active_writer().segment_id()
}
fn rotate(&mut self) -> crate::Result<()> {
log::debug!("Rotating segment writer");
let new_segment_id = self.id_generator.next();
let segment_path = self.folder.join(new_segment_id.to_string());
let new_writer =
Writer::new(segment_path, new_segment_id)?.use_compression(self.compression.clone());
self.writers.push(new_writer);
Ok(())
}
pub fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
key: K,
value: V,
) -> crate::Result<u32> {
let key = key.as_ref();
let value = value.as_ref();
let target_size = self.target_size;
let writer = self.get_active_writer_mut();
let bytes_written = writer.write(key, value)?;
if writer.offset() >= target_size {
writer.flush()?;
self.rotate()?;
}
Ok(bytes_written)
}
pub(crate) fn finish(mut self) -> crate::Result<Vec<Writer<C>>> {
let writer = self.get_active_writer_mut();
if writer.item_count > 0 {
writer.flush()?;
}
Ok(self.writers)
}
}