use std::cmp::Ordering;
use std::sync::Arc;
use crate::clock::LogicalClock;
use crate::error::{Error, Result};
use crate::{Comparator, InternalKey, InternalKeyRef, LSMIterator, Value};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SnapshotVisibility {
BoundedBySnapshot(u64),
NoActiveSnapshots,
NewerThanAllSnapshots,
}
pub type BoxedLSMIterator<'a> = Box<dyn LSMIterator + 'a>;
struct BinaryHeap<T> {
data: Vec<T>,
}
impl<T: Copy> BinaryHeap<T> {
fn with_capacity(capacity: usize) -> Self {
Self {
data: Vec::with_capacity(capacity),
}
}
#[inline]
fn is_empty(&self) -> bool {
self.data.is_empty()
}
#[inline]
fn peek(&self) -> Option<T> {
self.data.first().copied()
}
fn clear(&mut self) {
self.data.clear();
}
fn push<F>(&mut self, item: T, cmp: F)
where
F: Fn(T, T) -> Ordering,
{
self.data.push(item);
self.sift_up(self.data.len() - 1, cmp);
}
fn pop<F>(&mut self, cmp: F) -> Option<T>
where
F: Fn(T, T) -> Ordering,
{
if self.data.is_empty() {
return None;
}
let len = self.data.len();
if len == 1 {
return self.data.pop();
}
self.data.swap(0, len - 1);
let result = self.data.pop();
if !self.data.is_empty() {
self.sift_down(0, cmp);
}
result
}
fn sift_down_root<F>(&mut self, cmp: F)
where
F: Fn(T, T) -> Ordering,
{
if !self.data.is_empty() {
self.sift_down(0, cmp);
}
}
fn sift_up<F>(&mut self, mut pos: usize, cmp: F)
where
F: Fn(T, T) -> Ordering,
{
while pos > 0 {
let parent = (pos - 1) / 2;
if cmp(self.data[pos], self.data[parent]) == Ordering::Less {
self.data.swap(pos, parent);
pos = parent;
} else {
break;
}
}
}
fn sift_down<F>(&mut self, mut pos: usize, cmp: F)
where
F: Fn(T, T) -> Ordering,
{
let len = self.data.len();
loop {
let left = 2 * pos + 1;
let right = 2 * pos + 2;
let mut smallest = pos;
if left < len && cmp(self.data[left], self.data[smallest]) == Ordering::Less {
smallest = left;
}
if right < len && cmp(self.data[right], self.data[smallest]) == Ordering::Less {
smallest = right;
}
if smallest != pos {
self.data.swap(pos, smallest);
pos = smallest;
} else {
break;
}
}
}
}
#[derive(Clone, Copy, PartialEq)]
enum Direction {
Forward,
Backward,
}
struct HeapEntry<'a> {
iter: BoxedLSMIterator<'a>,
level_idx: usize,
}
pub(crate) struct MergingIterator<'a> {
children: Vec<HeapEntry<'a>>,
min_heap: BinaryHeap<usize>,
max_heap: Option<BinaryHeap<usize>>,
direction: Direction,
cmp: Arc<dyn Comparator>,
}
impl<'a> MergingIterator<'a> {
pub fn new(iterators: Vec<BoxedLSMIterator<'a>>, cmp: Arc<dyn Comparator>) -> Self {
let capacity = iterators.len();
let children: Vec<_> = iterators
.into_iter()
.enumerate()
.map(|(idx, iter)| HeapEntry {
iter,
level_idx: idx,
})
.collect();
Self {
children,
min_heap: BinaryHeap::with_capacity(capacity),
max_heap: None,
direction: Direction::Forward,
cmp,
}
}
#[inline]
fn cmp_min(children: &[HeapEntry<'_>], cmp: &dyn Comparator, a: usize, b: usize) -> Ordering {
let key_a = children[a].iter.key().encoded();
let key_b = children[b].iter.key().encoded();
cmp.compare(key_a, key_b).then_with(|| children[a].level_idx.cmp(&children[b].level_idx))
}
#[inline]
fn cmp_max(children: &[HeapEntry<'_>], cmp: &dyn Comparator, a: usize, b: usize) -> Ordering {
let key_a = children[a].iter.key().encoded();
let key_b = children[b].iter.key().encoded();
cmp.compare(key_a, key_b)
.reverse()
.then_with(|| children[a].level_idx.cmp(&children[b].level_idx))
}
fn init_max_heap(&mut self) {
if self.max_heap.is_none() {
self.max_heap = Some(BinaryHeap::with_capacity(self.children.len()));
}
}
fn clear_heaps(&mut self) {
self.min_heap.clear();
if let Some(ref mut h) = self.max_heap {
h.clear();
}
}
fn rebuild_min_heap(&mut self) {
let children = &self.children;
let cmp = self.cmp.as_ref();
for i in 0..children.len() {
if children[i].iter.valid() {
self.min_heap.push(i, |a, b| Self::cmp_min(children, cmp, a, b));
}
}
}
fn rebuild_max_heap(&mut self) {
let children = &self.children;
let cmp = self.cmp.as_ref();
let max_heap = self.max_heap.as_mut().unwrap();
for i in 0..children.len() {
if children[i].iter.valid() {
max_heap.push(i, |a, b| Self::cmp_max(children, cmp, a, b));
}
}
}
fn init_forward(&mut self) -> Result<()> {
self.direction = Direction::Forward;
self.clear_heaps();
for child in &mut self.children {
child.iter.seek_first()?;
}
self.rebuild_min_heap();
Ok(())
}
fn init_backward(&mut self) -> Result<()> {
self.direction = Direction::Backward;
self.init_max_heap();
self.clear_heaps();
for child in &mut self.children {
child.iter.seek_last()?;
}
self.rebuild_max_heap();
Ok(())
}
fn switch_to_forward(&mut self, target: &[u8]) -> Result<()> {
let current_idx = self.max_heap.as_ref().and_then(|h| h.peek());
self.direction = Direction::Forward;
self.clear_heaps();
for (idx, child) in self.children.iter_mut().enumerate() {
if Some(idx) == current_idx {
child.iter.next()?;
} else {
if child.iter.seek(target)? {
while child.iter.valid()
&& self.cmp.compare(child.iter.key().encoded(), target) != Ordering::Greater
{
if !child.iter.next()? {
break;
}
}
}
}
}
self.rebuild_min_heap();
Ok(())
}
fn switch_to_backward(&mut self, target: &[u8]) -> Result<()> {
let current_idx = self.min_heap.peek();
self.direction = Direction::Backward;
self.init_max_heap();
self.clear_heaps();
for (idx, child) in self.children.iter_mut().enumerate() {
if Some(idx) == current_idx {
child.iter.prev()?;
} else {
if child.iter.seek(target)? {
while child.iter.valid()
&& self.cmp.compare(child.iter.key().encoded(), target) != Ordering::Less
{
if !child.iter.prev()? {
break;
}
}
} else {
child.iter.seek_last()?;
}
}
}
self.rebuild_max_heap();
Ok(())
}
fn advance_winner(&mut self) -> Result<bool> {
match self.direction {
Direction::Forward => {
if self.min_heap.is_empty() {
return Ok(false);
}
let winner = self.min_heap.peek().unwrap();
let valid = self.children[winner].iter.next()?;
let children = &self.children;
let cmp = self.cmp.as_ref();
if valid {
self.min_heap.sift_down_root(|a, b| Self::cmp_min(children, cmp, a, b));
} else {
self.min_heap.pop(|a, b| Self::cmp_min(children, cmp, a, b));
}
Ok(!self.min_heap.is_empty())
}
Direction::Backward => {
let max_heap = self.max_heap.as_mut().unwrap();
if max_heap.is_empty() {
return Ok(false);
}
let winner = max_heap.peek().unwrap();
let valid = self.children[winner].iter.prev()?;
let children = &self.children;
let cmp = self.cmp.as_ref();
if valid {
self.max_heap
.as_mut()
.unwrap()
.sift_down_root(|a, b| Self::cmp_max(children, cmp, a, b));
} else {
self.max_heap.as_mut().unwrap().pop(|a, b| Self::cmp_max(children, cmp, a, b));
}
Ok(!self.max_heap.as_ref().unwrap().is_empty())
}
}
}
#[inline]
pub fn is_valid(&self) -> bool {
match self.direction {
Direction::Forward => !self.min_heap.is_empty(),
Direction::Backward => self.max_heap.as_ref().is_some_and(|h| !h.is_empty()),
}
}
#[inline]
pub fn current_key(&self) -> InternalKeyRef<'_> {
debug_assert!(self.is_valid());
let idx = match self.direction {
Direction::Forward => self.min_heap.peek().unwrap(),
Direction::Backward => self.max_heap.as_ref().unwrap().peek().unwrap(),
};
self.children[idx].iter.key()
}
#[inline]
pub fn current_value(&self) -> Result<&[u8]> {
debug_assert!(self.is_valid());
let idx = match self.direction {
Direction::Forward => self.min_heap.peek().unwrap(),
Direction::Backward => self.max_heap.as_ref().unwrap().peek().unwrap(),
};
self.children[idx].iter.value_encoded()
}
}
impl LSMIterator for MergingIterator<'_> {
fn seek(&mut self, target: &[u8]) -> Result<bool> {
self.direction = Direction::Forward;
self.clear_heaps();
for child in &mut self.children {
child.iter.seek(target)?;
}
self.rebuild_min_heap();
Ok(self.is_valid())
}
fn seek_first(&mut self) -> Result<bool> {
self.init_forward()?;
Ok(self.is_valid())
}
fn seek_last(&mut self) -> Result<bool> {
self.init_backward()?;
Ok(self.is_valid())
}
fn next(&mut self) -> Result<bool> {
if !self.is_valid() {
return Ok(false);
}
if self.direction != Direction::Forward {
let target = self.current_key().encoded().to_vec();
self.switch_to_forward(&target)?;
return Ok(self.is_valid());
}
self.advance_winner()
}
fn prev(&mut self) -> Result<bool> {
if !self.is_valid() {
return Ok(false);
}
if self.direction != Direction::Backward {
let target = self.current_key().encoded().to_vec();
self.switch_to_backward(&target)?;
return Ok(self.is_valid());
}
self.advance_winner()
}
fn valid(&self) -> bool {
self.is_valid()
}
fn key(&self) -> InternalKeyRef<'_> {
self.current_key()
}
fn value_encoded(&self) -> Result<&[u8]> {
self.current_value()
}
}
pub(crate) struct CompactionIterator<'a> {
merge_iter: MergingIterator<'a>,
is_bottom_level: bool,
current_user_key: Vec<u8>,
accumulated_versions: Vec<(InternalKey, Value)>,
output_versions: Vec<(InternalKey, Value)>,
enable_versioning: bool,
retention_period_ns: u64,
clock: Arc<dyn LogicalClock>,
initialized: bool,
snapshots: Vec<u64>,
}
impl<'a> CompactionIterator<'a> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
iterators: Vec<BoxedLSMIterator<'a>>,
cmp: Arc<dyn Comparator>,
is_bottom_level: bool,
enable_versioning: bool,
retention_period_ns: u64,
clock: Arc<dyn LogicalClock>,
snapshots: Vec<u64>,
) -> Self {
let merge_iter = MergingIterator::new(iterators, cmp);
Self {
merge_iter,
is_bottom_level,
current_user_key: Vec::new(),
accumulated_versions: Vec::new(),
output_versions: Vec::new(),
enable_versioning,
retention_period_ns,
clock,
initialized: false,
snapshots,
}
}
fn initialize(&mut self) -> Result<()> {
self.merge_iter.seek_first()?;
self.initialized = true;
Ok(())
}
fn find_earliest_visible_snapshot(&self, seq_num: u64) -> Result<SnapshotVisibility> {
if self.snapshots.is_empty() {
return Ok(SnapshotVisibility::NoActiveSnapshots);
}
if seq_num == 0 {
return Err(Error::InvalidArgument(
"Sequence number 0 is invalid for snapshot visibility check".to_string(),
));
}
match self.snapshots.binary_search(&seq_num) {
Ok(idx) => Ok(SnapshotVisibility::BoundedBySnapshot(self.snapshots[idx])),
Err(idx) => {
if idx < self.snapshots.len() {
Ok(SnapshotVisibility::BoundedBySnapshot(self.snapshots[idx]))
} else {
Ok(SnapshotVisibility::NewerThanAllSnapshots)
}
}
}
}
fn same_visibility_boundary(
&self,
newer_vis: SnapshotVisibility,
older_vis: SnapshotVisibility,
) -> bool {
match (newer_vis, older_vis) {
(
SnapshotVisibility::BoundedBySnapshot(s1),
SnapshotVisibility::BoundedBySnapshot(s2),
) => s1 == s2,
(
SnapshotVisibility::NewerThanAllSnapshots,
SnapshotVisibility::NewerThanAllSnapshots,
) => true,
(SnapshotVisibility::NoActiveSnapshots, SnapshotVisibility::NoActiveSnapshots) => true,
_ => false,
}
}
#[inline]
fn must_preserve_for_snapshot(&self, visibility: SnapshotVisibility) -> bool {
matches!(visibility, SnapshotVisibility::BoundedBySnapshot(_))
}
fn process_accumulated_versions(&mut self) -> Result<()> {
if self.accumulated_versions.is_empty() {
return Ok(());
}
self.accumulated_versions.sort_by_key(|b| std::cmp::Reverse(b.0.seq_num()));
let latest_is_delete_at_bottom = self.is_bottom_level
&& !self.accumulated_versions.is_empty()
&& self.accumulated_versions[0].0.is_hard_delete_marker();
let has_set_with_delete = self.accumulated_versions.iter().any(|(key, _)| key.is_replace());
let mut newer_version_visibility: Option<SnapshotVisibility> = None;
let len = self.accumulated_versions.len();
for i in 0..len {
let (key, value) = &self.accumulated_versions[i];
let is_hard_delete = key.is_hard_delete_marker();
let is_replace = key.is_replace();
let is_latest = i == 0;
let seq_num = key.seq_num();
let current_visibility = self.find_earliest_visible_snapshot(seq_num)?;
let superseded = if let Some(newer_vis) = newer_version_visibility {
let snapshot_allows_drop = match current_visibility {
SnapshotVisibility::BoundedBySnapshot(_) => true,
SnapshotVisibility::NewerThanAllSnapshots => true,
SnapshotVisibility::NoActiveSnapshots => !self.enable_versioning,
};
snapshot_allows_drop
&& !is_latest && self.same_visibility_boundary(newer_vis, current_visibility)
} else {
false
};
let required_by_snapshot =
!superseded && self.must_preserve_for_snapshot(current_visibility);
let should_mark_stale = if superseded {
true
} else if latest_is_delete_at_bottom {
true
} else if required_by_snapshot {
false
} else if is_latest && !is_hard_delete && !is_replace {
false
} else if is_latest && is_hard_delete && self.is_bottom_level {
true
} else if is_latest && is_hard_delete && !self.is_bottom_level {
false
} else if is_latest && is_replace {
false
} else if is_hard_delete {
true
} else if has_set_with_delete && !is_replace {
true
} else {
if !self.enable_versioning {
true
} else if self.retention_period_ns > 0 {
let current_time = self.clock.now();
let age = current_time.saturating_sub(key.timestamp);
age > self.retention_period_ns
} else {
false
}
};
let should_output = if superseded {
false
} else if latest_is_delete_at_bottom {
false
} else if should_mark_stale {
false
} else if self.enable_versioning || required_by_snapshot {
true
} else {
is_latest
};
if should_output {
self.output_versions.push((key.clone(), value.clone()));
}
newer_version_visibility = Some(current_visibility);
}
self.accumulated_versions.clear();
Ok(())
}
pub fn advance(&mut self) -> Result<Option<(InternalKey, Value)>> {
if !self.initialized {
self.initialize()?;
}
loop {
if !self.output_versions.is_empty() {
return Ok(Some(self.output_versions.remove(0)));
}
if !self.merge_iter.is_valid() {
if !self.accumulated_versions.is_empty() {
self.process_accumulated_versions()?;
if !self.output_versions.is_empty() {
return Ok(Some(self.output_versions.remove(0)));
}
}
return Ok(None);
}
let key_owned = self.merge_iter.current_key().to_owned();
let user_key_owned = key_owned.user_key.clone();
let value = self.merge_iter.current_value()?.to_vec();
let is_new_key =
self.current_user_key.is_empty() || user_key_owned != self.current_user_key;
if is_new_key {
if !self.accumulated_versions.is_empty() {
self.process_accumulated_versions()?;
self.current_user_key = user_key_owned;
self.accumulated_versions.push((key_owned, value));
self.merge_iter.next()?;
if !self.output_versions.is_empty() {
return Ok(Some(self.output_versions.remove(0)));
}
} else {
self.current_user_key = user_key_owned;
self.accumulated_versions.push((key_owned, value));
self.merge_iter.next()?;
}
} else {
self.accumulated_versions.push((key_owned, value));
self.merge_iter.next()?;
}
}
}
}
impl Iterator for CompactionIterator<'_> {
type Item = Result<(InternalKey, Value)>;
fn next(&mut self) -> Option<Self::Item> {
match self.advance() {
Ok(Some(item)) => Some(Ok(item)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}