use std::rc::Rc;
use bytes::{Bytes, BytesMut};
use tempest_io::{Io, IoBuf};
use tempest_rt::read_exact;
use crate::{
StorageError,
base::{Comparer, InternalKey},
iterator::StorageIterator,
sst::{SstHandle, block::BlockIterator},
};
pub(crate) struct SstIterator<I: Io, C: Comparer> {
handle: Rc<SstHandle<I, C>>,
current: Option<BlockIterator<C>>,
next_index_pos: usize,
last_key: Option<InternalKey<C, Bytes>>,
peeked: Option<(InternalKey<C, Bytes>, Bytes)>,
}
impl<I: Io, C: Comparer> SstIterator<I, C> {
pub(crate) fn new(handle: Rc<SstHandle<I, C>>) -> Self {
Self {
handle,
current: None,
next_index_pos: 0,
last_key: None,
peeked: None,
}
}
pub(crate) fn into_handle(self) -> Rc<SstHandle<I, C>> {
self.handle
}
async fn load_block(
&mut self,
index: usize,
block_offset: u64,
block_size: u32,
) -> Result<(), StorageError> {
let block_size = block_size as usize;
let block_buf = BytesMut::with_capacity(block_size);
let (result, s) =
read_exact::<_, I>(self.handle.fd, block_buf.slice(..block_size), block_offset).await;
result?;
let block_raw = s.into_inner().freeze();
assert_eq!(block_raw.len(), block_size);
self.current = Some(BlockIterator::new(block_raw));
self.next_index_pos = index + 1;
Ok(())
}
async fn next_block(&mut self) -> Result<Option<&mut BlockIterator<C>>, StorageError> {
if self.current.is_some() {
return Ok(self.current.as_mut());
}
let index = self.next_index_pos;
let Some((block_offset, block_size)) = self.handle.block_index.get_block_by_index(index)
else {
return Ok(None);
};
self.load_block(index, block_offset, block_size).await?;
Ok(self.current.as_mut())
}
}
impl<I: Io, C: Comparer> StorageIterator<I, C> for SstIterator<I, C> {
async fn next(&mut self) -> Result<Option<(InternalKey<C, Bytes>, Bytes)>, StorageError> {
if let Some(entry) = self.peeked.take() {
self.last_key = Some(entry.0.clone());
return Ok(Some(entry));
}
if let Some(iter) = self.next_block().await? {
match iter.next() {
Some(entry) => {
self.last_key = Some(entry.0.clone());
return Ok(Some(entry));
}
None => {
self.current = None;
if let Some(iter) = self.next_block().await? {
let entry = iter.next();
if let Some(ref e) = entry {
self.last_key = Some(e.0.clone());
}
return Ok(entry);
}
}
}
}
Ok(None)
}
async fn seek(&mut self, key: InternalKey<C, Bytes>) -> Result<(), StorageError> {
if let Some(ref last) = self.last_key {
if &key <= last {
return Ok(());
}
}
self.peeked = None;
let Some((index, block_offset, block_size)) = self.handle.block_index.get_block_for(&key)
else {
self.current = None;
self.next_index_pos = usize::MAX;
return Ok(());
};
self.load_block(index, block_offset, block_size).await?;
loop {
let Some(iter) = self.current.as_mut() else {
break;
};
match iter.next() {
None => {
self.current = None;
break;
}
Some(entry) => {
if entry.0.compare_logical(&key).is_ge() {
self.last_key = Some(entry.0.clone());
self.peeked = Some(entry);
break;
}
}
}
}
Ok(())
}
}