use crate::{
blob_tree::handle::BlobIndirection, file::BLOBS_FOLDER, table::Table,
tree::ingest::Ingestion as TableIngestion, vlog::BlobFileWriter, SeqNo, UserKey, UserValue,
};
pub struct BlobIngestion<'a> {
tree: &'a crate::BlobTree,
pub(crate) table: TableIngestion<'a>,
pub(crate) blob: BlobFileWriter,
seqno: SeqNo,
separation_threshold: u32,
last_key: Option<UserKey>,
}
impl<'a> BlobIngestion<'a> {
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
#[expect(
clippy::expect_used,
reason = "cannot define blob tree without kv separation options"
)]
let kv = tree
.index
.config
.kv_separation_opts
.as_ref()
.expect("kv separation options should exist");
let blob_file_size = kv.file_target_size;
let table = TableIngestion::new(&tree.index)?;
let blob = BlobFileWriter::new(
tree.index.0.blob_file_id_counter.clone(),
tree.index.config.path.join(BLOBS_FOLDER),
tree.index.id,
tree.index.config.descriptor_table.clone(),
)?
.use_target_size(blob_file_size)
.use_compression(kv.compression);
let separation_threshold = kv.separation_threshold;
Ok(Self {
tree,
table,
blob,
seqno: 0,
separation_threshold,
last_key: None,
})
}
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}
#[expect(clippy::cast_possible_truncation)]
let value_size = value.len() as u32;
if value_size >= self.separation_threshold {
let vhandle = self.blob.write(&key, self.seqno, &value)?;
let indirection = BlobIndirection {
vhandle,
size: value_size,
};
let cloned_key = key.clone();
let res = self.table.write_indirection(key, indirection);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
} else {
let cloned_key = key.clone();
let res = self.table.write(key, value);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}
}
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}
let cloned_key = key.clone();
let res = self.table.write_tombstone(key);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}
pub fn write_weak_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}
let cloned_key = key.clone();
let res = self.table.write_weak_tombstone(key);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}
#[allow(clippy::significant_drop_tightening)]
pub fn finish(self) -> crate::Result<()> {
use crate::AbstractTree;
let index = self.index().clone();
let flush_lock = index.get_flush_lock();
index.rotate_memtable();
index.flush(&flush_lock, 0)?;
let blob_files = self.blob.finish()?;
let results = self.table.writer.finish()?;
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut _compaction_state = index.compaction_state.lock().expect("lock is poisoned");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_lock = index.version_history.write().expect("lock is poisoned");
let global_seqno = index.config.seqno.next();
let created_tables = results
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
Table::recover(
index
.config
.path
.join(crate::file::TABLES_FOLDER)
.join(table_id.to_string()),
checksum,
global_seqno,
index.id,
index.config.cache.clone(),
index.config.descriptor_table.clone(),
false,
false,
#[cfg(feature = "metrics")]
index.metrics.clone(),
)
})
.collect::<crate::Result<Vec<_>>>()?;
version_lock.upgrade_version_with_seqno(
&index.config.path,
|current| {
let mut copy = current.clone();
copy.version =
copy.version
.with_new_l0_run(&created_tables, Some(&blob_files), None);
Ok(copy)
},
global_seqno,
&self.tree.index.config.visible_seqno,
)?;
if let Err(e) = version_lock.maintenance(&index.config.path, 0) {
log::warn!("Version GC failed: {e:?}");
}
Ok(())
}
#[inline]
fn index(&self) -> &crate::Tree {
&self.tree.index
}
}