use super::writer::Writer;
use crate::{
id::{IdGenerator, SegmentId},
CompressionType, IndexWriter, ValueHandle,
};
use std::path::{Path, PathBuf};
pub struct MultiWriter<W: IndexWriter> {
folder: PathBuf,
target_size: u64,
writers: Vec<Writer>,
pub(crate) index_writer: W,
id_generator: IdGenerator,
compression: CompressionType,
}
impl<W: IndexWriter> MultiWriter<W> {
#[doc(hidden)]
pub fn new<P: AsRef<Path>>(
id_generator: IdGenerator,
target_size: u64,
folder: P,
index_writer: W,
compression: CompressionType,
) -> std::io::Result<Self> {
let folder = folder.as_ref();
let segment_id = id_generator.next();
let segment_path = folder.join(segment_id.to_string());
Ok(Self {
id_generator,
folder: folder.into(),
target_size,
writers: vec![Writer::new(segment_path, segment_id, compression)?],
index_writer,
compression,
})
}
fn get_active_writer(&self) -> &Writer {
self.writers.last().expect("should exist")
}
fn get_active_writer_mut(&mut self) -> &mut Writer {
self.writers.last_mut().expect("should exist")
}
#[must_use]
pub(crate) fn offset(&self, key: &[u8]) -> u64 {
self.get_active_writer().offset()
+ std::mem::size_of::<u16>() as u64 + key.len() as u64
}
#[must_use]
pub(crate) fn segment_id(&self) -> SegmentId {
self.get_active_writer().segment_id()
}
fn rotate(&mut self) -> crate::Result<()> {
log::debug!("Rotating segment writer");
let new_segment_id = self.id_generator.next();
self.writers
.push(Writer::new(&self.folder, new_segment_id, self.compression)?);
Ok(())
}
pub fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
key: K,
value: V,
) -> crate::Result<u32> {
let key = key.as_ref();
let value = value.as_ref();
let target_size = self.target_size;
let segment_id = self.segment_id();
let offset = self.offset(key);
let vhandle = ValueHandle { segment_id, offset };
log::trace!(
"GC: inserting indirection: {segment_id:?}:{offset:?} => {:?}",
String::from_utf8_lossy(key)
);
self.index_writer
.insert_indirection(key, vhandle, value.len() as u32)?;
let writer = self.get_active_writer_mut();
let bytes_written = writer.write(key, value)?;
if writer.offset() >= target_size {
writer.flush()?;
self.rotate()?;
}
Ok(bytes_written)
}
pub(crate) fn finish(mut self) -> crate::Result<(Vec<Writer>, W)> {
let writer = self.get_active_writer_mut();
if writer.item_count > 0 {
writer.flush()?;
}
Ok((self.writers, self.index_writer))
}
}