use parking_lot::MutexGuard;
use std::collections::VecDeque;
use std::ops::Range;
use std::rc::Rc;
use std::sync::atomic::Ordering;
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc, Arc};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crate::compaction::errors::CompactionWorkerError;
use crate::compaction::LevelCompactionStats;
use crate::db::{GuardedDbFields, PortableDatabaseState};
use crate::errors::{RainDBError, RainDBResult};
use crate::key::{InternalKey, MAX_SEQUENCE_NUMBER};
use crate::versioning::file_iterators::MergingIterator;
use crate::versioning::{VersionChangeManifest, VersionSet};
use crate::{Operation, RainDbIterator, DB};
use super::errors::CompactionWorkerResult;
use super::manifest::CompactionManifest;
use super::state::CompactionState;
use super::ManualCompactionConfiguration;
#[cfg(not(feature = "strict"))]
const COMPACTION_THREAD_NAME: &str = "raindb-tumtum";
#[cfg(feature = "strict")]
const COMPACTION_THREAD_NAME: &str = "raindb-compact";
const TASK_BUFFER_WARNING_THRESHOLD: usize = 2_000;
#[derive(Debug)]
pub(crate) enum TaskKind {
Compaction,
Terminate,
}
pub(crate) struct CompactionWorker {
maybe_background_compaction_handle: Option<JoinHandle<()>>,
task_sender: mpsc::SyncSender<TaskKind>,
}
impl CompactionWorker {
pub(crate) fn new(db_state: PortableDatabaseState) -> CompactionWorkerResult<Self> {
let (task_sender, receiver) = mpsc::sync_channel(1000);
log::info!("Starting up the background compaction thread.");
let background_thread_handle = thread::Builder::new()
.name(COMPACTION_THREAD_NAME.to_string())
.spawn(move || {
log::info!("Compaction thread initializing.");
let database_state = db_state;
let mut task_queue: VecDeque<TaskKind> = VecDeque::new();
loop {
log::info!("Compaction thread waiting for tasks.");
let channel_task = receiver.recv().unwrap();
task_queue.push_back(channel_task);
while !task_queue.is_empty() {
let task = task_queue.pop_front().unwrap();
match task {
TaskKind::Compaction => {
log::info!("Compaction thread received the compaction command.");
if CompactionWorker::compaction_task(&database_state) {
task_queue.push_back(TaskKind::Compaction);
}
}
TaskKind::Terminate => {
log::info!(
"Compaction thread received the termination command. \
Shutting down the thread."
);
break;
}
}
let pending_tasks = receiver.try_iter();
let mut pending_tasks_counter: usize = 0;
for new_external_task in pending_tasks {
task_queue.push_front(new_external_task);
pending_tasks_counter += 1;
}
if pending_tasks_counter > 1 {
log::debug!(
"Compaction worker found {pending_tasks_counter} pending tasks \
and added them to the thread-internal task buffer. There are a \
total of {} pending tasks.",
task_queue.len()
);
}
if task_queue.len() >= TASK_BUFFER_WARNING_THRESHOLD {
log::warn!(
"Compaction worker task buffer is at {} tasks. The warning \
theshold is {TASK_BUFFER_WARNING_THRESHOLD}.",
task_queue.len()
);
}
}
if database_state.is_shutting_down.load(Ordering::Acquire) {
log::info!("Compaction thread terminated.");
break;
}
}
})?;
let thread = background_thread_handle.thread();
let thread_name = thread.name().map_or("<unnamed>", |name| name);
log::info!(
"Compaction thread started with name {thread_name}.",
thread_name = thread_name
);
let worker = Self {
maybe_background_compaction_handle: Some(background_thread_handle),
task_sender,
};
Ok(worker)
}
pub(crate) fn schedule_task(&self, task_kind: TaskKind) {
if let Err(try_send_err) = self.task_sender.try_send(task_kind) {
match try_send_err {
TrySendError::Full(sent_task_kind) => {
log::warn!(
"The compaction worker task channel is full and needs to block until the \
receiver clears the buffer."
);
self.task_sender.send(sent_task_kind).unwrap();
}
TrySendError::Disconnected(sent_task_kind) => {
log::error!(
"The receiver for the compaction thread was disconnected when sending \
the following command: {sent_task_kind:?}"
);
}
}
}
}
pub(crate) fn stop_worker_thread(&mut self) -> Option<JoinHandle<()>> {
if let Some(compaction_thread_handle) = self.maybe_background_compaction_handle.take() {
if self.task_sender.send(TaskKind::Terminate).is_err() {
log::debug!("Compaction worker thread has already been terminated.");
}
return Some(compaction_thread_handle);
}
None
}
}
impl CompactionWorker {
fn compaction_task(db_state: &PortableDatabaseState) -> bool {
let PortableDatabaseState {
guarded_db_fields,
is_shutting_down,
background_work_finished_signal,
..
} = db_state;
let mut db_fields_guard = guarded_db_fields.lock();
if is_shutting_down.load(Ordering::Acquire) {
log::info!(
"Compaction thread discovered that the database was shutting down. Halting \
compaction work."
);
} else if db_fields_guard.maybe_bad_database_state.is_some() {
log::warn!(
"Compaction thread discovered that the database was in a bad state. Halting \
compaction work."
);
} else {
CompactionWorker::coordinate_compaction(db_state, &mut db_fields_guard);
}
db_fields_guard.background_compaction_scheduled = false;
background_work_finished_signal.notify_all();
if DB::should_schedule_compaction(db_state, &mut db_fields_guard) {
log::info!(
"Determined that another compaction is necessary. Scheduling compaction task."
);
return true;
}
log::debug!("No follow-up compaction work detected.");
false
}
fn coordinate_compaction(
db_state: &PortableDatabaseState,
db_fields_guard: &mut MutexGuard<GuardedDbFields>,
) {
if db_fields_guard.maybe_immutable_memtable.is_some() {
log::info!(
"Compaction thread found an immutable memtable to compact. Proceeding with \
memtable compaction."
);
CompactionWorker::compact_memtable(db_state, db_fields_guard);
return;
}
let is_manual_compaction = db_fields_guard.maybe_manual_compaction.is_some();
let mut manual_compaction_end_key: Option<InternalKey> = None;
let maybe_compaction_manifest: Option<CompactionManifest>;
if is_manual_compaction {
let compaction_level: usize;
let compaction_range: Range<Option<InternalKey>>;
{
let manual_compaction = db_fields_guard
.maybe_manual_compaction
.as_ref()
.unwrap()
.lock();
log::info!(
"Found manual compaction request for level {} and attempting to execute.",
manual_compaction.level
);
compaction_level = manual_compaction.level;
compaction_range = manual_compaction.clone_key_range();
}
maybe_compaction_manifest = db_fields_guard
.version_set
.compact_range(compaction_level, compaction_range);
let mut manual_compaction = db_fields_guard
.maybe_manual_compaction
.as_ref()
.unwrap()
.lock();
manual_compaction.done = maybe_compaction_manifest.is_none();
if maybe_compaction_manifest.is_some() {
manual_compaction_end_key = Some(
maybe_compaction_manifest
.as_ref()
.unwrap()
.get_compaction_level_files()
.last()
.unwrap()
.largest_key()
.clone(),
);
}
CompactionWorker::log_manual_compaction_summary(
&manual_compaction,
manual_compaction_end_key.as_ref(),
);
} else {
log::info!(
"Compaction thread proceeding with normal compaction procedure. Determining if a \
size triggered or seek triggered compaction is required."
);
maybe_compaction_manifest = db_fields_guard.version_set.pick_compaction();
}
let mut has_compaction_error = false;
if let Some(mut compaction_manifest) = maybe_compaction_manifest {
if !is_manual_compaction && compaction_manifest.is_trivial_move() {
assert!(compaction_manifest.get_compaction_level_files().len() == 1);
log::debug!("Determined that the compaction can be completed with a trival move");
compaction_manifest.set_change_manifest_for_trivial_move();
let apply_result = VersionSet::log_and_apply(
db_fields_guard,
compaction_manifest.get_change_manifest_mut(),
);
if let Err(error) = apply_result {
log::error!(
"Compaction thread encountered an error while applying a trivial move. \
Error: {error}",
error = &error
);
DB::set_bad_database_state(
db_state,
db_fields_guard,
CompactionWorkerError::VersionManifestError(error).into(),
);
has_compaction_error = true;
}
let file_to_compact = Arc::clone(
compaction_manifest
.get_compaction_level_files()
.first()
.unwrap(),
);
log::info!(
"Moved file with number {file_num} ({file_size} bytes) from level \
{parent_level} to level {new_level}. Level summary: {level_summary}",
file_num = file_to_compact.file_number(),
file_size = file_to_compact.get_file_size(),
parent_level = compaction_manifest.level(),
new_level = compaction_manifest.level() + 1,
level_summary = db_fields_guard.version_set.level_summary()
);
} else {
let compaction_result = CompactionWorker::compact_tables(
db_state,
db_fields_guard,
compaction_manifest,
);
match compaction_result {
Ok(mut compaction_state) => {
CompactionWorker::cleanup_compaction(
db_fields_guard,
&mut compaction_state,
);
compaction_state
.compaction_manifest_mut()
.release_inputs(&mut db_fields_guard.version_set);
DB::remove_obsolete_files(
db_fields_guard,
db_state.options.filesystem_provider(),
&db_state.file_name_handler,
&*db_state.table_cache,
)
}
Err(err) => {
log::error!(
"Compaction thread encountered an error while compacting tables. \
Error: {error}",
error = &err
);
DB::set_bad_database_state(db_state, db_fields_guard, err);
has_compaction_error = true;
}
}
}
}
if has_compaction_error && db_state.is_shutting_down.load(Ordering::Acquire) {
log::warn!(
"Encountered an error during compaction but ignoring since we are shutting down."
);
}
if is_manual_compaction {
let manual_compaction = db_fields_guard.maybe_manual_compaction.take().unwrap();
let mut manual_compaction_guard = manual_compaction.lock();
if !manual_compaction_guard.done {
manual_compaction_guard.begin = Some(manual_compaction_end_key.unwrap());
}
}
}
fn compact_memtable(
db_state: &PortableDatabaseState,
db_fields_guard: &mut MutexGuard<GuardedDbFields>,
) {
assert!(db_fields_guard.maybe_immutable_memtable.is_some());
log::info!("Compacting the immutable memtable to a table file.");
let mut change_manifest = VersionChangeManifest::default();
let base_version = db_fields_guard.version_set.get_current_version();
let immutable_memtable = db_fields_guard.maybe_immutable_memtable.clone().unwrap();
let write_table_result = DB::convert_memtable_to_file(
db_state,
db_fields_guard,
Arc::clone(&immutable_memtable),
Some(&base_version),
&mut change_manifest,
);
db_fields_guard.version_set.release_version(base_version);
if let Err(write_table_error) = write_table_result {
DB::set_bad_database_state(
db_state,
db_fields_guard,
CompactionWorkerError::WriteTable(Box::new(write_table_error)).into(),
);
return;
}
if db_state.is_shutting_down.load(Ordering::Acquire) {
log::warn!(
"Compaction thread discovered that the database was shutting down. Halting \
compaction work. Recording background error to stop other writes from occurring."
);
DB::set_bad_database_state(
db_state,
db_fields_guard,
CompactionWorkerError::UnexpectedState(
"Detected database shutdown signal while compacting the memtable.".to_string(),
)
.into(),
);
return;
}
change_manifest.prev_wal_file_number = None;
change_manifest.wal_file_number = Some(db_fields_guard.curr_wal_file_number);
log::info!(
"Compaction thread is applying memtable compaction change manifest to the current \
version."
);
let apply_result = VersionSet::log_and_apply(db_fields_guard, &mut change_manifest);
if let Err(apply_error) = apply_result {
log::error!(
"There was an error logging and applying the change manifest. Error: {}",
apply_error
);
DB::set_bad_database_state(
db_state,
db_fields_guard,
CompactionWorkerError::VersionManifestError(apply_error).into(),
);
return;
}
log::info!(
"Compaction thread committing to new database state. Removing immutable memtable and \
obsolete files."
);
db_fields_guard.maybe_immutable_memtable.take();
db_state
.has_immutable_memtable
.store(false, Ordering::Release);
DB::remove_obsolete_files(
db_fields_guard,
db_state.options.filesystem_provider(),
Arc::clone(&db_state.file_name_handler).as_ref(),
Arc::clone(&db_state.table_cache).as_ref(),
);
}
fn compact_tables(
db_state: &PortableDatabaseState,
db_fields_guard: &mut MutexGuard<GuardedDbFields>,
compaction_manifest: CompactionManifest,
) -> RainDBResult<CompactionState> {
let compaction_instant = Instant::now();
let mut total_memtable_compaction_time: Duration = Duration::default();
log::info!(
"Compacting {num_compaction_level_files} files at level {compaction_level} with \
{num_parent_files} files at parent level {parent_level}.",
num_compaction_level_files = compaction_manifest.get_compaction_level_files().len(),
compaction_level = compaction_manifest.level(),
num_parent_files = compaction_manifest.get_parent_level_files().len(),
parent_level = compaction_manifest.level() + 1
);
log::info!(
"The level summary prior to table compaction is {level_summary}",
level_summary = db_fields_guard.version_set.level_summary()
);
assert!(
db_fields_guard
.version_set
.num_files_at_level(compaction_manifest.level())
> 0
);
let mut compaction_state: CompactionState = if db_fields_guard.snapshots.is_empty() {
CompactionState::new(
compaction_manifest,
db_fields_guard.version_set.get_prev_sequence_number(),
)
} else {
CompactionState::new(
compaction_manifest,
db_fields_guard
.snapshots
.oldest()
.read()
.element
.sequence_number(),
)
};
let compaction_result = parking_lot::MutexGuard::<'_, GuardedDbFields>::unlocked_fair(
db_fields_guard,
|| -> RainDBResult<MergingIterator> {
let mut maybe_current_user_key: Option<Vec<u8>> = None;
let mut last_sequence_for_key = MAX_SEQUENCE_NUMBER;
let mut file_iterator = compaction_state
.compaction_manifest()
.make_merging_iterator(Arc::clone(&db_state.table_cache))?;
file_iterator.seek_to_first()?;
while file_iterator.is_valid() && !db_state.is_shutting_down.load(Ordering::Acquire)
{
if db_state.has_immutable_memtable.load(Ordering::Acquire) {
let memtable_compaction_start = Instant::now();
let mut db_mutex_guard = db_state.guarded_db_fields.lock();
if db_mutex_guard.maybe_immutable_memtable.is_some() {
CompactionWorker::compact_memtable(db_state, &mut db_mutex_guard);
db_state.background_work_finished_signal.notify_all();
}
total_memtable_compaction_time += memtable_compaction_start.elapsed();
}
if compaction_state.has_table_builder()
&& compaction_state
.compaction_manifest_mut()
.should_stop_before_key(file_iterator.current().unwrap().0)
{
compaction_state.finish_compaction_output_file(
Arc::clone(&db_state.table_cache),
&mut file_iterator,
)?;
}
let mut should_drop_entry = false;
let (current_key, current_value) = file_iterator.current().unwrap();
if maybe_current_user_key.is_none()
|| current_key.get_user_key() != maybe_current_user_key.as_ref().unwrap()
{
maybe_current_user_key = Some(current_key.get_user_key().to_vec());
last_sequence_for_key = MAX_SEQUENCE_NUMBER
}
#[allow(clippy::if_same_then_else)]
if last_sequence_for_key <= compaction_state.get_smallest_snapshot() {
should_drop_entry = true;
} else if current_key.get_operation() == Operation::Delete
&& current_key.get_sequence_number()
<= compaction_state.get_smallest_snapshot()
&& compaction_state
.compaction_manifest_mut()
.is_base_level_for_key(current_key)
{
should_drop_entry = true;
}
last_sequence_for_key = current_key.get_sequence_number();
log::debug!(
"Compaction thread processing table entry with--user key: {user_key:?}, \
seq: {seq_num}, operation: {op:?}, is dropping: {is_dropping}, is base \
level: {is_base_level}, last seen sequence: {last_seq}, smallest \
snapshot: {smallest_snapshot}",
user_key = current_key.get_user_key(),
seq_num = current_key.get_sequence_number(),
op = current_key.get_operation(),
is_dropping = should_drop_entry,
is_base_level = compaction_state
.compaction_manifest_mut()
.is_base_level_for_key(current_key),
last_seq = last_sequence_for_key,
smallest_snapshot = compaction_state.get_smallest_snapshot()
);
if !should_drop_entry {
if !compaction_state.has_table_builder() {
compaction_state.open_compaction_output_file(db_state)?;
}
if compaction_state.table_builder_mut().get_num_entries() == 0 {
compaction_state
.current_output_mut()
.set_smallest_key(Some(current_key.clone()));
}
compaction_state
.current_output_mut()
.set_largest_key(Some(current_key.clone()));
compaction_state
.table_builder_mut()
.add_entry(Rc::new(current_key.clone()), current_value)?;
if compaction_state.table_builder_mut().file_size()
>= compaction_state
.compaction_manifest()
.max_output_file_size_bytes()
{
compaction_state.finish_compaction_output_file(
Arc::clone(&db_state.table_cache),
&mut file_iterator,
)?;
}
}
file_iterator.next();
}
Ok(file_iterator)
},
);
let mut compaction_error: Option<RainDBError> = None;
if let Ok(mut file_iterator) = compaction_result {
if db_state.is_shutting_down.load(Ordering::Acquire) {
log::debug!("The database is shutting down during a compaction.");
compaction_error = Some(
CompactionWorkerError::UnexpectedState(
"The database is shutting down during a compaction.".to_owned(),
)
.into(),
);
} else if compaction_state.has_table_builder() {
compaction_error = compaction_state
.finish_compaction_output_file(
Arc::clone(&db_state.table_cache),
&mut file_iterator,
)
.err();
}
if compaction_error.is_none() {
compaction_error = file_iterator.get_error();
}
} else {
compaction_error = compaction_result.err();
}
let compaction_duration = compaction_instant.elapsed() - total_memtable_compaction_time;
let compaction_input_bytes = compaction_state
.compaction_manifest()
.compaction_input_read_bytes();
let compaction_output_bytes = compaction_state.get_output_size();
let compaction_stats = LevelCompactionStats {
compaction_duration,
bytes_read: compaction_input_bytes,
bytes_written: compaction_output_bytes,
};
db_fields_guard.compaction_stats[compaction_state.compaction_manifest().level() + 1] +=
compaction_stats;
if compaction_error.is_none() {
let install_result = CompactionWorker::install_compaction_results(
db_fields_guard,
&mut compaction_state,
);
compaction_error = install_result.err().map(|worker_error| worker_error.into());
}
if let Some(error) = compaction_error {
log::error!("Setting compaction error: {}", &error);
DB::set_bad_database_state(db_state, db_fields_guard, error);
}
log::info!(
"Compaction thread finished compaction. The level summary is now {level_summary}",
level_summary = db_fields_guard.version_set.level_summary()
);
Ok(compaction_state)
}
fn install_compaction_results(
db_fields_guard: &mut MutexGuard<GuardedDbFields>,
compaction_state: &mut CompactionState,
) -> CompactionWorkerResult<()> {
{
let compaction_manifest = compaction_state.compaction_manifest();
log::info!(
"Compacted {compaction_level_files} files at level {compaction_level} + \
{parent_level_files} files at level {parent_level} to {output_bytes} bytes.",
compaction_level_files = compaction_manifest.get_compaction_level_files().len(),
compaction_level = compaction_manifest.level(),
parent_level_files = compaction_manifest.get_parent_level_files().len(),
parent_level = compaction_manifest.level() + 1,
output_bytes = compaction_state.total_size_bytes
);
}
compaction_state.finalize_version_manifest();
VersionSet::log_and_apply(
db_fields_guard,
compaction_state
.compaction_manifest_mut()
.get_change_manifest_mut(),
)?;
Ok(())
}
fn cleanup_compaction(
db_fields_guard: &mut MutexGuard<GuardedDbFields>,
compaction_state: &mut CompactionState,
) {
if compaction_state.has_table_builder() {
compaction_state.table_builder_mut().abandon();
}
for file in compaction_state.get_output_files() {
db_fields_guard.tables_in_use.remove(&file.file_number());
}
}
fn log_manual_compaction_summary(
manual_compaction_config: &ManualCompactionConfiguration,
manual_compaction_end_key: Option<&InternalKey>,
) {
let compaction_start_string: String = match manual_compaction_config.begin.as_ref() {
Some(key) => format!("{:?}", Vec::<u8>::from(key)),
None => "(begin)".to_string(),
};
let compaction_end_string: String = match manual_compaction_config.end.as_ref() {
Some(key) => format!("{:?}", Vec::<u8>::from(key)),
None => "(end)".to_string(),
};
let manual_end_string: String = if manual_compaction_config.done {
"the specified end key".to_string()
} else {
format!(
"potentially smaller key {:?}",
Vec::<u8>::from(manual_compaction_end_key.unwrap())
)
};
log::info!(
"Manual compaction requested for level {compaction_level} from {start_key} to \
{end_key}. Compaction will end at {manual_end_key}.",
compaction_level = manual_compaction_config.level,
start_key = compaction_start_string,
end_key = compaction_end_string,
manual_end_key = manual_end_string
);
}
}