mod blob_column_builder;
mod blob_column_factory;
mod char_column_builder;
mod column_builder;
mod column_iterator;
mod concrete_column_iterator;
mod primitive_column_builder;
mod primitive_column_factory;
mod row_handler_column_iterator;
use std::future::Future;
use std::io::{Read, Seek, SeekFrom};
pub use blob_column_builder::*;
pub use blob_column_factory::*;
pub use column_builder::*;
pub use column_iterator::*;
pub use concrete_column_iterator::*;
pub use primitive_column_builder::*;
pub use primitive_column_factory::*;
use risinglight_proto::rowset::BlockIndex;
pub use row_handler_column_iterator::*;
mod char_column_factory;
use std::os::unix::fs::FileExt;
use std::sync::{Arc, Mutex};
use bytes::Bytes;
pub use char_column_factory::*;
use moka::future::Cache;
use super::block::BLOCK_META_CHECKSUM_SIZE;
use super::{Block, BlockCacheKey, BlockMeta, ColumnIndex, BLOCK_META_SIZE};
use crate::array::Array;
use crate::storage::secondary::verify_checksum;
use crate::storage::{StorageResult, TracedStorageError};
pub trait ColumnBuilder<A: Array> {
fn append(&mut self, array: &A);
fn finish(self) -> (Vec<BlockIndex>, Vec<u8>);
}
pub trait ColumnIterator<A: Array> {
type NextFuture<'a>: Future<Output = StorageResult<Option<(u32, A)>>> + 'a
where
Self: 'a;
fn next_batch(&mut self, expected_size: Option<usize>) -> Self::NextFuture<'_>;
fn fetch_hint(&self) -> (usize, bool);
fn fetch_current_row_id(&self) -> u32;
fn skip(&mut self, cnt: usize);
}
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum ColumnSeekPosition {
RowId(u32),
#[allow(dead_code)]
SortKey(()),
}
impl ColumnSeekPosition {
pub fn start() -> Self {
Self::RowId(0)
}
}
#[derive(Clone)]
pub enum ColumnReadableFile {
#[cfg(unix)]
PositionedRead(Arc<std::fs::File>),
NormalRead(Arc<Mutex<std::fs::File>>),
InMemory(Bytes),
}
#[derive(Clone)]
pub struct Column {
index: ColumnIndex,
file: ColumnReadableFile,
block_cache: Cache<BlockCacheKey, Block>,
base_block_key: BlockCacheKey,
}
impl Column {
pub fn new(
index: ColumnIndex,
file: ColumnReadableFile,
block_cache: Cache<BlockCacheKey, Block>,
base_block_key: BlockCacheKey,
) -> Self {
Self {
index,
file,
block_cache,
base_block_key,
}
}
pub fn index(&self) -> &ColumnIndex {
&self.index
}
pub fn on_disk_size(&self) -> u64 {
let lst_idx = self.index.index(self.index.len() as u32 - 1);
lst_idx.offset + lst_idx.length
}
pub async fn get_block(&self, block_id: u32) -> StorageResult<(BlockMeta, Block)> {
let key = self.base_block_key.clone().block(block_id);
let mut block_header = BlockMeta::default();
let mut do_verify_checksum = false;
let block =
self.block_cache
.try_get_with(key, async {
let file = self.file.clone();
let info = self.index.index(block_id).clone();
let block = tokio::task::spawn_blocking(move || {
let data = match file {
ColumnReadableFile::PositionedRead(file) => {
let mut data = vec![0; info.length as usize];
file.read_exact_at(&mut data[..], info.offset)?;
Bytes::from(data)
}
ColumnReadableFile::NormalRead(file) => {
let mut data = vec![0; info.length as usize];
let mut file = file.lock().unwrap();
file.seek(SeekFrom::Start(info.offset))?;
file.read_exact(&mut data[..])?;
Bytes::from(data)
}
ColumnReadableFile::InMemory(file) => file
.slice(info.offset as usize..(info.offset + info.length) as usize),
};
Ok::<_, TracedStorageError>(data)
})
.await
.unwrap();
do_verify_checksum = true;
block
})
.await?;
if block.len() < BLOCK_META_SIZE {
return Err(TracedStorageError::decode(
"block is smaller than header size",
));
}
let mut header = &block[block.len() - BLOCK_META_SIZE..];
block_header.decode(&mut header)?;
if do_verify_checksum {
verify_checksum(
block_header.checksum_type,
&block[..block.len() - BLOCK_META_CHECKSUM_SIZE],
block_header.checksum,
)?;
}
Ok((block_header, block.slice(..block.len() - BLOCK_META_SIZE)))
}
}