use futures::Future;
use risinglight_proto::rowset::block_index::BlockType;
use risinglight_proto::rowset::BlockIndex;
use super::super::{Block, BlockIterator};
use super::{Column, ColumnIterator, ColumnSeekPosition};
use crate::array::{Array, ArrayBuilder};
use crate::storage::StorageResult;
pub trait BlockIteratorFactory<A: Array>: Send + Sync + 'static {
type BlockIteratorImpl: BlockIterator<A> + Send + 'static;
fn get_iterator_for(
&self,
block_type: BlockType,
block: Block,
index: &BlockIndex,
start_pos: usize,
) -> Self::BlockIteratorImpl;
fn get_fake_iterator(&self, index: &BlockIndex, start_pos: usize) -> Self::BlockIteratorImpl;
}
#[derive(Debug, Default)]
pub struct Statistics {
next_batch_count: u32,
fetched_block_count: u32,
}
pub struct ConcreteColumnIterator<A: Array, F: BlockIteratorFactory<A>> {
column: Column,
current_block_id: u32,
block_iterator: F::BlockIteratorImpl,
current_row_id: u32,
finished: bool,
factory: F,
is_fake_iter: bool,
statistics: Statistics,
}
impl<A: Array, F: BlockIteratorFactory<A>> ConcreteColumnIterator<A, F> {
pub async fn new(column: Column, start_pos: u32, factory: F) -> StorageResult<Self> {
let current_block_id = column
.index()
.block_of_seek_position(ColumnSeekPosition::RowId(start_pos));
let (header, block) = column.get_block(current_block_id).await?;
Ok(Self {
block_iterator: factory.get_iterator_for(
header.block_type,
block,
column.index().index(current_block_id),
start_pos as usize,
),
column,
current_block_id,
current_row_id: start_pos,
finished: false,
factory,
is_fake_iter: false,
statistics: Statistics {
next_batch_count: 0,
fetched_block_count: 1,
},
})
}
pub async fn next_batch_inner(
&mut self,
expected_size: Option<usize>,
) -> StorageResult<Option<(u32, A)>> {
if self.finished {
return Ok(None);
}
let capacity = if let Some(expected_size) = expected_size {
expected_size
} else {
self.block_iterator.remaining_items()
};
let mut builder = A::Builder::with_capacity(capacity);
let mut total_cnt = 0;
let first_row_id = self.current_row_id;
if self.is_fake_iter {
self.is_fake_iter = false;
let (header, block) = self.column.get_block(self.current_block_id).await?;
self.statistics.fetched_block_count += 1;
self.block_iterator = self.factory.get_iterator_for(
header.block_type,
block,
self.column.index().index(self.current_block_id),
self.current_row_id as usize,
)
}
loop {
let cnt = self
.block_iterator
.next_batch(expected_size.map(|x| x - total_cnt), &mut builder);
total_cnt += cnt;
self.current_row_id += cnt as u32;
if let Some(expected_size) = expected_size {
if total_cnt >= expected_size {
break;
}
} else if total_cnt != 0 {
break;
}
self.current_block_id += 1;
if self.current_block_id >= self.column.index().len() as u32 {
self.finished = true;
break;
}
let (header, block) = self.column.get_block(self.current_block_id).await?;
self.statistics.fetched_block_count += 1;
self.block_iterator = self.factory.get_iterator_for(
header.block_type,
block,
self.column.index().index(self.current_block_id),
self.current_row_id as usize,
);
}
if total_cnt == 0 {
Ok(None)
} else {
Ok(Some((first_row_id, builder.finish())))
}
}
fn fetch_hint_inner(&self) -> (usize, bool) {
if self.finished {
return (0, true);
}
let index = self.column.index().index(self.current_block_id);
(
(index.row_count - (self.current_row_id - index.first_rowid)) as usize,
false,
)
}
fn incre_block_id(&mut self) -> bool {
let len = self.column.index().len() as u32;
self.current_block_id += 1;
if self.current_block_id >= len {
self.finished = true;
true
} else {
false
}
}
fn skip_inner(&mut self, mut cnt: usize) {
if self.finished {
return;
}
self.current_row_id += cnt as u32;
if self.is_fake_iter {
let row_count = self.column.index().index(self.current_block_id).row_count;
let start_pos = self.column.index().index(self.current_block_id).first_rowid;
let mut reached = start_pos + row_count;
while self.current_row_id > reached {
if self.incre_block_id() {
return;
}
let row_count = self.column.index().index(self.current_block_id).row_count;
reached += row_count;
}
return;
}
let remaining_items = self.block_iterator.remaining_items();
if cnt >= remaining_items {
cnt -= remaining_items;
if self.incre_block_id() {
return;
}
} else {
self.block_iterator.skip(cnt);
return;
}
while cnt > 0 {
let row_count = self.column.index().index(self.current_block_id).row_count as usize;
if cnt >= row_count {
cnt -= row_count;
if self.incre_block_id() {
return;
}
} else {
cnt = 0;
}
}
assert_eq!(cnt, 0);
self.is_fake_iter = true;
}
}
impl<A: Array, F: BlockIteratorFactory<A>> ColumnIterator<A> for ConcreteColumnIterator<A, F> {
type NextFuture<'a> = impl Future<Output = StorageResult<Option<(u32, A)>>> + 'a;
fn next_batch(&mut self, expected_size: Option<usize>) -> Self::NextFuture<'_> {
async move {
self.statistics.next_batch_count += 1;
self.next_batch_inner(expected_size).await
}
}
fn fetch_hint(&self) -> (usize, bool) {
self.fetch_hint_inner()
}
fn fetch_current_row_id(&self) -> u32 {
self.current_row_id
}
fn skip(&mut self, cnt: usize) {
self.skip_inner(cnt);
}
}
impl<A: Array, F: BlockIteratorFactory<A>> Drop for ConcreteColumnIterator<A, F> {
fn drop(&mut self) {
tracing::debug!(
"{:#?}, total_block_count:{}, fetch_ratio:{}",
self.statistics,
self.current_block_id,
self.statistics.fetched_block_count as f64 / self.current_block_id as f64
);
}
}