use std::{
ops::Bound,
sync::{Arc, atomic::Ordering},
};
use crate::{
blob::ValueRef,
error::Result,
internal_key::{InternalKey, ValueKind},
memtable::Memtable,
range_tombstone,
types::{KeyRange, Sequence},
write_batch::BatchOperation,
};
use super::tree::{ImmutableMemtable, LsmTree, RangeTombstone, lock_poisoned};
impl LsmTree {
pub(crate) fn apply_operation(
&self,
operation: BatchOperation,
sequence: Sequence,
batch_index: u32,
) -> Result<()> {
let active_memtable = self
.active_memtable
.read()
.map_err(|_| lock_poisoned("active memtable"))?
.clone();
match operation {
BatchOperation::Put { key, value, .. } => {
active_memtable
.insert(
InternalKey::new(key, sequence, ValueKind::Put, batch_index),
Some(ValueRef::Inline(value)),
)
.map_err(|()| lock_poisoned("memtable entries"))?;
}
BatchOperation::Delete { key, .. } => {
active_memtable
.insert(
InternalKey::new(key, sequence, ValueKind::PointDelete, batch_index),
None,
)
.map_err(|()| lock_poisoned("memtable entries"))?;
}
BatchOperation::DeleteRange { range, .. } => {
let tombstone_bytes = range_tombstone_bytes(&range);
let mut range_tombstones = self
.range_tombstones
.write()
.map_err(|_| lock_poisoned("range tombstones"))?;
range_tombstone::insert_sorted(
&mut range_tombstones,
RangeTombstone {
range,
sequence,
batch_index,
},
);
self.range_tombstone_bytes
.fetch_add(tombstone_bytes, Ordering::AcqRel);
}
}
self.mark_delta_mirror_sequence(sequence);
Ok(())
}
pub(crate) fn active_memtable_bytes(&self) -> Result<u64> {
let active_memtable = self
.active_memtable
.read()
.map_err(|_| lock_poisoned("active memtable"))?
.clone();
Ok(active_memtable
.estimated_bytes()
.saturating_add(self.range_tombstone_bytes.load(Ordering::Acquire)))
}
pub(crate) fn memtable_bytes(&self) -> Result<u64> {
let mut bytes = self
.active_memtable_bytes()?
.saturating_add(self.delta_estimated_bytes()?);
let immutable_memtables = self
.immutable_memtables
.read()
.map_err(|_| lock_poisoned("immutable memtable queue"))?
.clone();
for immutable in immutable_memtables {
bytes = bytes.saturating_add(immutable.estimated_bytes);
}
Ok(bytes)
}
pub(crate) fn immutable_memtable_count(&self) -> usize {
self.immutable_memtable_count.load(Ordering::Acquire)
}
pub(crate) fn has_immutable_memtables(&self) -> bool {
self.has_immutable_memtable_fast()
}
pub(crate) fn has_immutable_memtables_at_or_below(
&self,
max_sequence: Sequence,
) -> Result<bool> {
self.immutable_memtables
.read()
.map_err(|_| lock_poisoned("immutable memtable queue"))
.map(|immutable_memtables| {
immutable_memtables
.iter()
.any(|immutable| immutable.freeze_sequence <= max_sequence)
})
}
pub(crate) fn freeze_active_memtable(&self, freeze_sequence: Sequence) -> Result<bool> {
let mut active_memtable = self
.active_memtable
.write()
.map_err(|_| lock_poisoned("active memtable"))?;
let active = Arc::clone(&active_memtable);
let entries_empty = active
.is_empty()
.map_err(|()| lock_poisoned("memtable entries"))?;
let mut range_tombstones = self
.range_tombstones
.write()
.map_err(|_| lock_poisoned("range tombstones"))?;
if entries_empty && range_tombstones.is_empty() {
return Ok(false);
}
let immutable = ImmutableMemtable {
estimated_bytes: active
.estimated_bytes()
.saturating_add(self.range_tombstone_bytes.load(Ordering::Acquire)),
memtable: active,
range_tombstones: Arc::new(range_tombstones.clone()),
freeze_sequence,
};
self.immutable_memtables
.write()
.map_err(|_| lock_poisoned("immutable memtable queue"))?
.push(immutable);
self.immutable_memtable_count
.fetch_add(1, Ordering::Release);
*active_memtable = Arc::new(Memtable::default());
range_tombstones.clear();
self.range_tombstone_bytes.store(0, Ordering::Release);
Ok(true)
}
}
fn range_tombstone_bytes(range: &KeyRange) -> u64 {
key_range_bytes(range)
.saturating_add(usize_to_u64_saturating(std::mem::size_of::<Sequence>()))
.saturating_add(usize_to_u64_saturating(std::mem::size_of::<u32>()))
}
fn key_range_bytes(range: &KeyRange) -> u64 {
bound_bytes(&range.start).saturating_add(bound_bytes(&range.end))
}
fn bound_bytes(bound: &Bound<Vec<u8>>) -> u64 {
match bound {
Bound::Included(bytes) | Bound::Excluded(bytes) => usize_to_u64_saturating(bytes.len()),
Bound::Unbounded => 0,
}
}
fn usize_to_u64_saturating(value: usize) -> u64 {
match u64::try_from(value) {
Ok(value) => value,
Err(_) => u64::MAX,
}
}