use parking_lot::RwLockWriteGuard;
use crate::db::DbInner;
use crate::db_state::DbState;
use crate::error::SlateDBError;
use crate::mem_table_flush::MemtableFlushMsg;
use crate::utils::SendSafely;
use crate::wal_replay::ReplayedMemtable;
impl DbInner {
pub(crate) fn maybe_freeze_memtable(
&self,
guard: &mut RwLockWriteGuard<'_, DbState>,
wal_id: u64,
) -> Result<(), SlateDBError> {
let meta = guard.memtable().metadata();
if self
.table_store
.estimate_encoded_size(meta.entry_num, meta.entries_size_in_bytes)
< self.settings.l0_sst_size_bytes
{
Ok(())
} else {
self.freeze_memtable(guard, wal_id)
}
}
pub(crate) fn freeze_memtable(
&self,
guard: &mut RwLockWriteGuard<'_, DbState>,
wal_id: u64,
) -> Result<(), SlateDBError> {
if guard.memtable().is_empty() {
return Ok(());
}
guard.freeze_memtable(wal_id)?;
self.memtable_flush_notifier.send_safely(
guard.closed_result_reader(),
MemtableFlushMsg::FlushImmutableMemtables { sender: None },
)?;
Ok(())
}
pub(crate) fn replay_memtable(
&self,
replayed_memtable: ReplayedMemtable,
) -> Result<(), SlateDBError> {
let mut guard = self.state.write();
let recent_flushed_wal_id = if replayed_memtable.last_wal_id > 0 {
replayed_memtable.last_wal_id - 1
} else {
0
};
self.freeze_memtable(&mut guard, recent_flushed_wal_id)?;
let last_wal = replayed_memtable.last_wal_id;
guard.modify(|modifier| modifier.state.manifest.core.next_wal_sst_id = last_wal + 1);
assert!(self.oracle.last_seq.load() <= replayed_memtable.last_seq);
self.oracle.last_seq.store(replayed_memtable.last_seq);
assert!(self.oracle.last_committed_seq.load() <= replayed_memtable.last_seq);
self.oracle
.last_committed_seq
.store(replayed_memtable.last_seq);
self.mono_clock.set_last_tick(replayed_memtable.last_tick)?;
guard.replace_memtable(replayed_memtable.table)
}
}