use std::sync::Arc;
use crate::db::PortableDatabaseState;
use crate::errors::{RainDBError, RainDBResult};
use crate::table_cache::TableCache;
use crate::tables::TableBuilder;
use crate::versioning::file_iterators::MergingIterator;
use crate::versioning::file_metadata::FileMetadata;
use super::manifest::CompactionManifest;
pub(crate) struct CompactionState {
output_files: Vec<FileMetadata>,
compaction_manifest: CompactionManifest,
smallest_snapshot: u64,
pub(crate) total_size_bytes: u64,
table_builder: Option<TableBuilder>,
}
impl CompactionState {
pub(crate) fn new(compaction_manifest: CompactionManifest, smallest_snapshot: u64) -> Self {
Self {
output_files: vec![],
compaction_manifest,
smallest_snapshot,
total_size_bytes: 0,
table_builder: None,
}
}
pub(crate) fn current_output_mut(&mut self) -> &mut FileMetadata {
self.output_files.last_mut().unwrap()
}
pub(crate) fn compaction_manifest(&self) -> &CompactionManifest {
&self.compaction_manifest
}
pub(crate) fn compaction_manifest_mut(&mut self) -> &mut CompactionManifest {
&mut self.compaction_manifest
}
pub(crate) fn has_table_builder(&self) -> bool {
self.table_builder.is_some()
}
pub(crate) fn table_builder_mut(&mut self) -> &mut TableBuilder {
self.table_builder.as_mut().unwrap()
}
pub(crate) fn get_output_files(&self) -> &[FileMetadata] {
&self.output_files
}
pub(crate) fn get_smallest_snapshot(&self) -> u64 {
self.smallest_snapshot
}
pub(crate) fn get_output_size(&self) -> u64 {
self.output_files
.iter()
.map(|metadata| metadata.get_file_size())
.sum()
}
pub(crate) fn finish_compaction_output_file(
&mut self,
table_cache: Arc<TableCache>,
iterator: &mut MergingIterator,
) -> RainDBResult<()> {
assert!(self.has_table_builder());
let table_file_number = self.current_output_mut().file_number();
assert!(table_file_number != 0);
let mut maybe_error: Option<RainDBError> = None;
let num_table_entries = self.table_builder_mut().get_num_entries();
if let Some(iter_err) = iterator.get_error() {
maybe_error = Some(iter_err);
self.table_builder_mut().abandon();
} else {
let finalize_result = self.table_builder_mut().finalize();
if let Err(finalize_err) = finalize_result {
maybe_error = Some(RainDBError::TableBuild(finalize_err));
}
}
let table_size = self.table_builder_mut().file_size();
self.current_output_mut().set_file_size(table_size);
self.total_size_bytes += table_size;
self.table_builder.take();
if let Some(err) = maybe_error {
return Err(err);
}
if num_table_entries > 0 {
let _table_iter = table_cache.find_table(table_file_number)?;
log::info!(
"Generated table {table_num} at level {level}. It has {num_entries} entries \
({num_bytes} bytes).",
table_num = table_file_number,
level = self.compaction_manifest.level(),
num_entries = num_table_entries,
num_bytes = table_size
);
}
Ok(())
}
pub(crate) fn open_compaction_output_file(
&mut self,
db_state: &PortableDatabaseState,
) -> RainDBResult<()> {
assert!(!self.has_table_builder());
let file_number: u64;
{
let mut db_fields_guard = db_state.guarded_db_fields.lock();
file_number = db_fields_guard.version_set.get_new_file_number();
db_fields_guard.tables_in_use.insert(file_number);
let file_metadata = FileMetadata::new(file_number);
self.output_files.push(file_metadata);
}
let table_builder = TableBuilder::new(db_state.options.clone(), file_number)?;
self.table_builder = Some(table_builder);
Ok(())
}
pub(crate) fn finalize_version_manifest(&mut self) {
self.compaction_manifest.add_input_deletions();
let parent_level = self.compaction_manifest.level() + 1;
for output_file in &self.output_files {
self.compaction_manifest.get_change_manifest_mut().add_file(
parent_level,
output_file.file_number(),
output_file.get_file_size(),
output_file.clone_key_range(),
);
}
}
}