use super::Tree;
use crate::{
BlobIndirection, SeqNo, UserKey, UserValue, config::FilterPolicyEntry, fs::Fs,
table::multi_writer::MultiWriter,
};
use std::cmp::Ordering;
use std::path::PathBuf;
use std::sync::Arc;
pub const INITIAL_CANONICAL_LEVEL: usize = 1;
pub struct Ingestion<'a> {
pub(crate) folder: PathBuf,
pub(crate) level_fs: Arc<dyn Fs>,
tree: &'a Tree,
pub(crate) writer: MultiWriter,
seqno: SeqNo,
last_key: Option<UserKey>,
}
impl<'a> Ingestion<'a> {
pub fn new(tree: &'a Tree) -> crate::Result<Self> {
let (folder, level_fs) = tree.config.tables_folder_for_level(0);
log::debug!("Ingesting into tables in {}", folder.display());
let index_partitioning = tree
.config
.index_block_partitioning_policy
.get(INITIAL_CANONICAL_LEVEL);
let filter_partitioning = tree
.config
.filter_block_partitioning_policy
.get(INITIAL_CANONICAL_LEVEL);
let mut writer = MultiWriter::new(
folder.clone(),
tree.table_id_counter.clone(),
64 * 1_024 * 1_024,
6,
level_fs.clone(),
)?
.set_comparator(tree.config.comparator.clone())
.use_bloom_policy({
if tree.config.expect_point_read_hits {
crate::config::BloomConstructionPolicy::BitsPerKey(0.0)
} else if let FilterPolicyEntry::Bloom(p) =
tree.config.filter_policy.get(INITIAL_CANONICAL_LEVEL)
{
p
} else {
crate::config::BloomConstructionPolicy::BitsPerKey(0.0)
}
})
.use_data_block_size(
tree.config
.data_block_size_policy
.get(INITIAL_CANONICAL_LEVEL),
)
.use_data_block_hash_ratio(
tree.config
.data_block_hash_ratio_policy
.get(INITIAL_CANONICAL_LEVEL),
)
.use_data_block_compression(
tree.config
.data_block_compression_policy
.get(INITIAL_CANONICAL_LEVEL),
)
.use_index_block_compression(
tree.config
.index_block_compression_policy
.get(INITIAL_CANONICAL_LEVEL),
)
.use_data_block_restart_interval(
tree.config
.data_block_restart_interval_policy
.get(INITIAL_CANONICAL_LEVEL),
)
.use_index_block_restart_interval(
tree.config
.index_block_restart_interval_policy
.get(INITIAL_CANONICAL_LEVEL),
);
if index_partitioning {
writer = writer.use_partitioned_index();
}
if filter_partitioning {
writer = writer.use_partitioned_filter();
}
writer = writer.use_prefix_extractor(tree.config.prefix_extractor.clone());
writer = writer.use_encryption(tree.config.encryption.clone());
#[cfg(zstd_any)]
{
writer = writer.use_zstd_dictionary(tree.config.zstd_dictionary.clone());
}
Ok(Self {
folder,
level_fs,
tree,
writer,
seqno: 0,
last_key: None,
})
}
pub(crate) fn write_indirection(
&mut self,
key: UserKey,
indirection: BlobIndirection,
) -> crate::Result<()> {
use crate::coding::Encode;
if let Some(prev) = &self.last_key {
assert!(
self.tree.config.comparator.compare(prev, &key) == Ordering::Less,
"next key in ingestion must be ordered after last key by configured comparator"
);
}
let cloned_key = key.clone();
self.writer.write(crate::InternalValue::from_components(
key,
indirection.encode_into_vec(),
self.seqno,
crate::ValueType::Indirection,
))?;
self.writer.register_blob(indirection);
self.last_key = Some(cloned_key);
Ok(())
}
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
self.tree.config.comparator.compare(prev, &key) == Ordering::Less,
"next key in ingestion must be ordered after last key by configured comparator"
);
}
self.writer.write(crate::InternalValue::from_components(
key.clone(),
value,
self.seqno,
crate::ValueType::Value,
))?;
self.last_key = Some(key);
Ok(())
}
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
self.tree.config.comparator.compare(prev, &key) == Ordering::Less,
"next key in ingestion must be ordered after last key by configured comparator"
);
}
self.writer.write(crate::InternalValue::from_components(
key.clone(),
crate::UserValue::empty(),
self.seqno,
crate::ValueType::Tombstone,
))?;
self.last_key = Some(key);
Ok(())
}
pub fn write_weak_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
self.tree.config.comparator.compare(prev, &key) == Ordering::Less,
"next key in ingestion must be ordered after last key by configured comparator"
);
}
self.writer.write(crate::InternalValue::from_components(
key.clone(),
crate::UserValue::empty(),
self.seqno,
crate::ValueType::WeakTombstone,
))?;
self.last_key = Some(key);
Ok(())
}
#[allow(clippy::significant_drop_tightening)]
pub fn finish(self) -> crate::Result<()> {
use crate::{AbstractTree, Table};
if self.last_key.is_none() {
log::trace!("No data written to Ingestion, returning early");
return Ok(());
}
let flush_lock = self.tree.get_flush_lock();
self.tree.rotate_memtable();
self.tree.flush(&flush_lock, 0)?;
let results = self.writer.finish()?;
log::info!("Finished ingestion writer");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut _compaction_state = self.tree.compaction_state.lock().expect("lock is poisoned");
#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let mut version_lock = self.tree.version_history.write().expect("lock is poisoned");
let global_seqno = self.tree.config.seqno.next();
let created_tables = results
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
Table::recover(
self.folder.join(table_id.to_string()),
checksum,
global_seqno,
self.tree.id,
self.tree.config.cache.clone(),
self.tree.config.descriptor_table.clone(),
self.level_fs.clone(),
false,
false,
self.tree.config.encryption.clone(),
#[cfg(zstd_any)]
self.tree.config.zstd_dictionary.clone(),
self.tree.config.comparator.clone(),
#[cfg(feature = "metrics")]
self.tree.metrics.clone(),
)
})
.collect::<crate::Result<Vec<_>>>()?;
version_lock.upgrade_version_with_seqno(
&self.tree.config.path,
|current| {
let mut copy = current.clone();
let ctx =
crate::version::TransformContext::new(self.tree.config.comparator.as_ref());
copy.version = copy
.version
.with_new_l0_run(&created_tables, None, None, &ctx);
Ok(copy)
},
global_seqno,
&self.tree.config.visible_seqno,
&*self.tree.config.fs,
)?;
if let Err(e) = version_lock.maintenance(&self.tree.config.path, 0, &*self.tree.config.fs) {
log::warn!("Version GC failed: {e:?}");
}
Ok(())
}
}