use crate::{
InternalValue, SeqNo,
comparator::SharedComparator,
double_ended_peekable::{DoubleEndedPeekable, DoubleEndedPeekableExt},
table::{
block::{Decoder, ParsedItem},
data_block::DataBlockParsedItem,
},
};
pub struct Iter<'a> {
bytes: &'a [u8],
decoder:
DoubleEndedPeekable<DataBlockParsedItem, Decoder<'a, InternalValue, DataBlockParsedItem>>,
comparator: SharedComparator,
}
impl<'a> Iter<'a> {
#[must_use]
pub fn new(
bytes: &'a [u8],
decoder: Decoder<'a, InternalValue, DataBlockParsedItem>,
comparator: SharedComparator,
) -> Self {
let decoder = decoder.double_ended_peekable();
Self {
bytes,
decoder,
comparator,
}
}
pub fn seek_to_offset(&mut self, offset: usize) -> bool {
self.decoder.inner_mut().set_lo_offset(offset);
true
}
pub fn seek_to_key_seqno(&mut self, needle: &[u8], seqno: SeqNo) -> bool {
let cmp = &self.comparator;
self.decoder.inner_mut().seek(
|head_key, head_seqno| match cmp.compare(head_key, needle) {
std::cmp::Ordering::Less => true,
std::cmp::Ordering::Equal => head_seqno >= seqno,
std::cmp::Ordering::Greater => false,
},
false,
)
}
pub fn seek(&mut self, needle: &[u8], seqno: SeqNo) -> bool {
if !self.seek_to_key_seqno(needle, seqno) {
return false;
}
loop {
let Some(item) = self.decoder.peek() else {
return false;
};
match item.compare_key(needle, self.bytes, self.comparator.as_ref()) {
std::cmp::Ordering::Equal => {
return true;
}
std::cmp::Ordering::Greater => {
return false;
}
std::cmp::Ordering::Less => {
#[expect(
clippy::expect_used,
reason = "we peeked a value successfully, so there must be a next item in the stream"
)]
self.decoder.next().expect("should exist");
}
}
}
}
pub fn seek_upper(&mut self, needle: &[u8], _seqno: SeqNo) -> bool {
let cmp = &self.comparator;
if !self.decoder.inner_mut().seek_upper(
|head_key, _| cmp.compare(head_key, needle) != std::cmp::Ordering::Greater,
false,
) {
return false;
}
loop {
let Some(item) = self.decoder.peek_back() else {
return false;
};
match item.compare_key(needle, self.bytes, self.comparator.as_ref()) {
std::cmp::Ordering::Equal => {
return true;
}
std::cmp::Ordering::Less => {
return false;
}
std::cmp::Ordering::Greater => {
#[expect(
clippy::expect_used,
reason = "we peeked a value successfully, so there must be a next item in the stream"
)]
self.decoder.next_back().expect("should exist");
}
}
}
}
pub fn seek_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool {
if !self.seek_to_key_seqno(needle, seqno) {
return false;
}
loop {
let Some(item) = self.decoder.peek() else {
return false;
};
match item.compare_key(needle, self.bytes, self.comparator.as_ref()) {
std::cmp::Ordering::Greater => {
return true;
}
std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
#[expect(
clippy::expect_used,
reason = "we peeked a value successfully, so there must be a next item in the stream"
)]
self.decoder.next().expect("should exist");
}
}
}
}
pub fn seek_upper_exclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool {
let cmp = &self.comparator;
if !self.decoder.inner_mut().seek_upper(
|head_key, _| cmp.compare(head_key, needle) != std::cmp::Ordering::Greater,
false,
) {
return false;
}
loop {
let Some(item) = self.decoder.peek_back() else {
return false;
};
match item.compare_key(needle, self.bytes, self.comparator.as_ref()) {
std::cmp::Ordering::Less => {
return true;
}
std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => {
#[expect(
clippy::expect_used,
reason = "we peeked a value successfully, so there must be a next item in the stream"
)]
self.decoder.next_back().expect("should exist");
}
}
}
}
}
impl Iterator for Iter<'_> {
type Item = DataBlockParsedItem;
fn next(&mut self) -> Option<Self::Item> {
self.decoder.next()
}
}
impl DoubleEndedIterator for Iter<'_> {
fn next_back(&mut self) -> Option<Self::Item> {
self.decoder.next_back()
}
}