use parking_lot::RwLockWriteGuard;
use crate::db::DbInner;
use crate::db_state::DbState;
use crate::error::SlateDBError;
use crate::oracle::Oracle;
use crate::wal_replay::ReplayedMemtable;
pub(crate) const MAX_WAL_FLUSHES_BEFORE_L0_FLUSH: u64 = 4096;
impl DbInner {
pub(crate) fn maybe_freeze_current_memtable(&self) -> Result<(), SlateDBError> {
let wal_id = self.wal_buffer.recent_flushed_wal_id();
let mut guard = self.state.write();
let meta = guard.memtable().metadata();
let last_freeze_wal_id = guard
.state()
.imm_memtable
.front()
.map(|imm| imm.recent_flushed_wal_id())
.unwrap_or(guard.state().core().replay_after_wal_id);
let l0_sst_size_est = self
.table_store
.estimate_encoded_size_compacted(meta.entry_num, meta.entries_size_in_bytes);
let wal_id_gap = wal_id
.checked_sub(last_freeze_wal_id)
.ok_or_else(|| SlateDBError::InvalidDBState)?;
if wal_id_gap < MAX_WAL_FLUSHES_BEFORE_L0_FLUSH
&& l0_sst_size_est < self.settings.l0_sst_size_bytes
{
Ok(())
} else {
self.freeze_memtable(&mut guard, wal_id)
}
}
pub(crate) fn freeze_current_memtable(&self) -> Result<(), SlateDBError> {
let wal_id = self.wal_buffer.recent_flushed_wal_id();
let mut guard = self.state.write();
self.freeze_memtable(&mut guard, wal_id)
}
pub(crate) fn freeze_memtable(
&self,
guard: &mut RwLockWriteGuard<'_, DbState>,
wal_id: u64,
) -> Result<(), SlateDBError> {
if guard.memtable().is_empty() {
return Ok(());
}
guard.freeze_memtable(wal_id);
let _ = self.memtable_flusher().notify_memtable_frozen();
Ok(())
}
pub(crate) fn replay_memtable(
&self,
replayed_memtable: ReplayedMemtable,
) -> Result<(), SlateDBError> {
let mut guard = self.state.write();
let recent_flushed_wal_id = if replayed_memtable.last_wal_id > 0 {
replayed_memtable.last_wal_id - 1
} else {
0
};
self.wal_buffer
.advance_recent_flushed_wal_id(recent_flushed_wal_id);
self.freeze_memtable(&mut guard, recent_flushed_wal_id)?;
let last_wal = replayed_memtable.last_wal_id;
guard.modify(|modifier| modifier.state.manifest.value.core.next_wal_sst_id = last_wal + 1);
assert!(self.oracle.last_seq() <= replayed_memtable.last_seq);
self.oracle.advance_last_seq(replayed_memtable.last_seq);
assert!(self.oracle.last_committed_seq() <= replayed_memtable.last_seq);
self.oracle
.advance_committed_seq(replayed_memtable.last_seq);
self.mono_clock.set_last_tick(replayed_memtable.last_tick)?;
guard.replace_memtable(replayed_memtable.table);
let dirty_manifest = guard.state().manifest.clone();
drop(guard);
self.status_manager.report_manifest(dirty_manifest.into());
Ok(())
}
}