use alloc::format;
use alloc::string::{String, ToString};
use alloc::{vec, vec::Vec};
use vpb::{
checksum::Checksumer,
compression::Compressor,
encrypt::Encryptor,
kvstructs::{
bytes::{BufMut, Bytes, BytesMut},
Key, KeyExt,
},
BlockOffset, Checksum, Compression, Marshaller, TableIndex,
};
use super::{
iterator::{BableIterator, Flag, UniTableIterator},
CheapIndex, ChecksumVerificationMode, Options, TableBuilder, FILE_SUFFIX,
};
use crate::{bloom::MayContain, error::*, Block, RefCounter};
pub struct MemoryFile {
id: u64,
filename: String,
data: Bytes,
created_at: u64,
}
pub trait TableData {
fn size(&self) -> usize;
fn write(self, writer: impl BufMut) -> Result<usize>;
}
impl super::Table {
pub fn create_table(id: u64, builder: impl TableBuilder, created_at: u64) -> Result<Self> {
let opts = builder.options();
let bd = builder.build()?;
match bd {
Some(bd) => {
let mut bytes = BytesMut::with_capacity(bd.size());
let _written = bd.write(&mut bytes)?;
Self::open_table(
MemoryFile {
id,
data: bytes.freeze(),
created_at,
filename: Self::new_filename(id),
},
opts,
)
}
None => Self::open_table(
MemoryFile {
id,
data: Bytes::new(),
created_at,
filename: Self::new_filename(id),
},
opts,
),
}
}
pub fn create_table_from_buffer(
id: u64,
data: Bytes,
created_at: u64,
opts: RefCounter<Options>,
) -> Result<Self> {
Self::open_table(
MemoryFile {
id,
filename: Self::new_filename(id),
data,
created_at,
},
opts,
)
}
pub fn open_table(mf: MemoryFile, opts: RefCounter<Options>) -> Result<Self> {
if opts.block_size() == 0 && !opts.compression().is_none() {
return Err(Error::EmptyBlock);
}
let mut this = RawTable {
mmap: mf,
table_size: mf.data.len(),
index: RefCounter::new(Default::default()),
cheap: CheapIndex {
max_version: 0,
key_count: 0,
uncompressed_size: 0,
on_disk_size: 0,
bloom_filter_length: 0,
offsets_length: 0,
num_entries: 0,
},
smallest: Default::default(),
biggest: Default::default(),
id: mf.id,
checksum: Default::default(),
created_at: mf.created_at,
index_start: 0,
index_len: 0,
has_bloom_filter: false,
opts,
};
this.init_biggest_and_smallest();
if matches!(
this.opts.checksum_verification_mode(),
ChecksumVerificationMode::OnTableRead | ChecksumVerificationMode::OnTableAndBlockRead
) {
return this.verify_checksum().map(|_| this.into()).map_err(|e| {
#[cfg(feature = "tracing")]
{
tracing::error!(target: "table", err = %e, info = "failed to verify checksum");
}
e
});
}
Ok(this.into())
}
#[inline]
pub fn created_at(&self) -> u64 {
self.created_at
}
#[inline]
pub fn id_to_filename(id: u64) -> String {
format!("{:06}.{}", id, FILE_SUFFIX)
}
#[inline]
pub fn parse_file_id(path: &str) -> (u64, bool) {
if path.ends_with(FILE_SUFFIX) {
if let Some(name) = path.split('.').take(1).next() {
name.parse::<u64>()
.map(|id| (id, true))
.unwrap_or((0, false))
} else {
(0, false)
}
} else {
(0, false)
}
}
#[inline]
pub fn new_filename(id: u64) -> String {
format!("{:06}.{}", id, FILE_SUFFIX)
}
}
pub struct RawTable {
mmap: MemoryFile,
table_size: usize,
index: RefCounter<TableIndex>,
cheap: CheapIndex,
smallest: Key,
biggest: Key,
id: u64,
checksum: Bytes,
created_at: u64,
index_start: usize,
index_len: usize,
has_bloom_filter: bool,
opts: RefCounter<Options>,
}
impl AsRef<RawTable> for RawTable {
fn as_ref(&self) -> &RawTable {
self
}
}
impl RawTable {
#[inline]
pub(super) fn biggest(&self) -> &Key {
&self.biggest
}
#[inline]
pub(super) fn smallest(&self) -> &Key {
&self.smallest
}
#[inline]
pub(super) fn id(&self) -> u64 {
self.id
}
#[inline]
pub(super) fn path(&self) -> &str {
self.mmap.filename.as_str()
}
#[inline]
pub(super) fn max_version(&self) -> u64 {
self.cheap.max_version
}
#[inline]
pub(super) fn bloom_filter_size(&self) -> usize {
self.cheap.bloom_filter_length
}
#[inline]
pub(super) fn uncompressed_size(&self) -> u32 {
self.cheap.uncompressed_size
}
#[inline]
pub(super) fn key_count(&self) -> u32 {
self.cheap.key_count
}
#[inline]
pub(super) fn num_entries(&self) -> usize {
self.cheap.num_entries
}
#[inline]
pub(super) fn on_disk_size(&self) -> u32 {
self.cheap.on_disk_size
}
#[inline]
pub(super) fn secret(&self) -> &[u8] {
self.opts.encryption().secret()
}
#[inline]
pub(super) fn compression(&self) -> Compression {
self.opts.compression()
}
#[inline]
pub(super) fn checksum(&self) -> &[u8] {
&self.checksum
}
#[inline]
pub(super) fn checksum_bytes(&self) -> &Bytes {
&self.checksum
}
pub(super) fn key_splits(&self, idx: usize, prefix: &[u8]) -> Vec<Key> {
let mut res = Vec::new();
if idx == 0 {
return res;
}
let offsets_len = self.offsets_length();
let mut idx = 0;
while idx < offsets_len {
if idx >= offsets_len {
idx = offsets_len - 1;
}
let bo = &self.fetch_index().offsets[idx];
if bo.key.has_prefix(prefix) {
res.push(bo.key.clone().into());
}
idx += offsets_len;
}
res
}
#[inline]
pub(super) fn iter(&self, opt: Flag) -> UniTableIterator<&RawTable> {
UniTableIterator::new(self, opt)
}
#[inline]
pub(super) fn offsets_length(&self) -> usize {
self.cheap.offsets_length
}
#[inline(always)]
fn read(&self, offset: usize, sz: usize) -> &[u8] {
&self.mmap.data[offset..offset + sz]
}
#[inline]
pub(super) fn contains_hash(&self, hash: u32) -> bool {
if !self.has_bloom_filter {
return false;
}
#[cfg(feature = "metrics")]
{
crate::metrics::BLOOM_HITS.fetch_add(crate::metrics::DOES_NOT_HAVE_ALL, 1)
}
let index = self.fetch_index();
let may_contain = index.bloom_filter.may_contain(hash);
#[cfg(feature = "metrics")]
{
if !may_contain {
crate::metrics::BLOOM_HITS.fetch_add(crate::metrics::DOES_NOT_HAVE_HIT, 1)
}
}
!may_contain
}
#[inline]
pub(super) fn covered_by_prefix(&self, prefix: &[u8]) -> bool {
self.biggest.parse_key().has_prefix(prefix) && self.smallest.parse_key().has_prefix(prefix)
}
#[inline]
pub(super) fn verify_checksum(&self) -> Result<()> {
let index = self.fetch_index();
for i in 0..(index.offsets.len() as isize) {
let blk = self.block(i, true).map_err(|e| {
#[cfg(feature = "tracing")]
{
tracing::error!(target: "table", info = "checksum verification failed", err = %e);
}
Error::BlockChecksumMismatch { table: self.path().to_string(), block: i as usize}
})?;
if !matches!(
self.opts.checksum_verification_mode(),
ChecksumVerificationMode::OnBlockRead
| ChecksumVerificationMode::OnTableAndBlockRead
) {
Block::verify_checksum(&blk, Options::checksum(&self.opts))
.map_err(|e| {
#[cfg(feature = "tracing")]
{
tracing::error!(target: "table", info = "checksum verification failed", err = %e);
}
Error::BlockOffsetChecksumMismatch { table: self.path().to_string(), block: i as usize, offset: blk.offset }
})?;
}
}
Ok(())
}
#[inline(always)]
fn block_cache_key(&self, idx: usize) -> [u8; core::mem::size_of::<u64>()] {
let mut key = [0u8; core::mem::size_of::<u64>()];
assert!(self.id < u32::MAX as u64);
assert!(u32::MAX > idx as u32);
key[..4].copy_from_slice(&(self.id as u32).to_be_bytes());
key[4..].copy_from_slice(&(idx as u32).to_be_bytes());
key
}
pub(super) fn block(&self, idx: isize, use_cache: bool) -> Result<RefCounter<Block>> {
let blk = self.block_inner(idx)?;
if use_cache {
self.insert_block_to_caches(idx, blk.clone());
}
Ok(blk)
}
#[inline(always)]
fn insert_block_to_caches(&self, idx: isize, blk: RefCounter<Block>) {
if let Some(cache) = self.opts.block_cache() {
let key = self.block_cache_key(idx as usize);
let blk_size = blk.size() as i64;
let mut k = BytesMut::with_capacity(core::mem::size_of::<u64>());
k.put_slice(&key);
#[cfg(feature = "std")]
cache.insert(k.freeze(), blk, blk_size);
#[cfg(not(feature = "std"))]
cache.insert(k.freeze(), blk.clone());
}
}
#[allow(clippy::unsound_collection_transmute)]
#[inline(always)]
fn block_inner(&self, idx: isize) -> Result<RefCounter<Block>> {
assert!(idx >= 0, "idx={}", idx);
if idx >= self.offsets_length() as isize {
return Err(Error::BlockOutOfRange {
num_offsets: self.offsets_length() as u64,
index: idx as u64,
});
}
let idx = idx as usize;
if let Some(cache) = self.opts.block_cache() {
let key = self.block_cache_key(idx);
if let Some(blk) = cache.get(key.as_ref()) {
return Ok(blk.clone());
}
}
let index = self.fetch_index();
let bo = &index.offsets[idx];
let offset = bo.offset as usize;
let bo_len = bo.len as usize;
let data = self.read(offset, bo_len);
let encryption = self.opts.encryption();
let data: Bytes = if encryption.is_some() {
let v = self.decrypt(data)?;
v.decompress_into_vec(self.opts.compression())
.map(From::from)
.map_err(Error::Compression)?
} else {
data.decompress_into_vec(self.opts.compression())
.map(From::from)
.map_err(Error::Compression)?
};
let blk_data_len = data.len();
let mut read_pos = blk_data_len - 4; let cks_len =
u32::from_be_bytes((&data[read_pos..read_pos + 4]).try_into().unwrap()) as usize;
if cks_len > blk_data_len {
return Err(Error::InvalidChecksumLength);
}
read_pos -= cks_len;
let checksum = data.slice(read_pos..read_pos + cks_len);
read_pos -= 4;
let num_entries =
u32::from_be_bytes((&data[read_pos..read_pos + 4]).try_into().unwrap()) as usize;
let entries_index_start = read_pos - (num_entries * 4);
let entries_index_end = entries_index_start + (num_entries * 4);
let mut entry_offsets = vec![0; num_entries];
entry_offsets.copy_from_slice(bytes_to_u32_slice(
&data[entries_index_start..entries_index_end],
));
let blk = Block {
offset,
data: data.slice(..read_pos + 4),
checksum,
entries_index_start,
entry_offsets,
};
if matches!(
self.opts.checksum_verification_mode(),
ChecksumVerificationMode::OnBlockRead | ChecksumVerificationMode::OnTableAndBlockRead
) {
Block::verify_checksum(&blk, Options::checksum(&self.opts))?;
}
Ok(RefCounter::new(blk))
}
fn init_biggest_and_smallest(&mut self) {
match self.init_index() {
Ok(ko) => {
self.smallest = Key::from(ko.key);
let mut iter = self.iter(Flag::REVERSED | Flag::NO_CACHE);
iter.rewind();
if !iter.valid() {
#[cfg(feature = "tracing")]
{
tracing::error!(target: "table", "failed to initialize biggest key for table: {}", self.path());
}
panic!(
"failed to initialize biggest key for table: {}",
self.path()
);
}
if let Some(key) = iter.key() {
self.biggest = key.to_key();
}
}
Err(e) => {
#[cfg(feature = "tracing")]
scopeguard::defer_on_unwind! {
let mut count = 0;
for i in self.mmap.data.len() - 1 ..=0 {
if self.mmap.data[i] != 0 {
break;
}
count += 1;
}
{
tracing::info!("== Recovering from initIndex crash ==");
tracing::info!("File info: [id: {}, size: {}, zeros: {}]", self.id, self.table_size, count);
tracing::info!("is_encrypt: {}", self.should_decrypt());
let mut read_pos = self.table_size;
read_pos -= 4;
let buf = self.read(read_pos, 4);
let checksum_len = u32::from_be_bytes(buf.try_into().unwrap());
tracing::info!("checksum length: {}", checksum_len);
read_pos -= checksum_len as usize;
let buf = self.read(read_pos, checksum_len as usize);
let checksum: Checksum = Marshaller::unmarshal(buf).unwrap();
tracing::info!("checksum: {:?}", checksum);
read_pos -= 4;
let buf = self.read(read_pos, 4);
let index_len = u32::from_be_bytes(buf.try_into().unwrap());
tracing::info!("index len: {}", index_len);
read_pos -= 4;
self.index_start = read_pos;
let index_data = self.read(read_pos, self.index_len);
tracing::info!("index: {:?}", index_data);
}
};
#[cfg(feature = "tracing")]
{
tracing::error!(target: "table", info = "fail to init biggest and smallest index for table", err = %e);
}
panic!("{}", e)
}
}
}
fn init_index(&mut self) -> Result<BlockOffset> {
let mut read_pos = self.table_size;
read_pos -= 4;
let buf = self.read(read_pos, 4);
let cks_len = u32::from_be_bytes(buf.try_into().unwrap()) as usize;
read_pos -= cks_len;
let buf = self.read(read_pos, cks_len);
let cks = Checksum::unmarshal(buf)?;
read_pos -= 4;
let buf = self.read(read_pos, 4);
self.index_len = u32::from_be_bytes(buf.try_into().unwrap()) as usize;
read_pos -= self.index_len;
self.index_start = read_pos;
let data = self.read(read_pos, self.index_len);
if !data.verify_checksum(cks.sum, Options::checksum(&self.opts)) {
return Err(Error::ChecksumMismatch);
}
let index = self.read_table_index()?;
let bo = index.offsets[0].clone();
let has_bloom_filter = !index.bloom_filter.is_empty();
self.cheap = CheapIndex {
max_version: index.max_version,
key_count: index.key_count,
uncompressed_size: index.uncompressed_size,
on_disk_size: self.table_size as u32,
bloom_filter_length: index.bloom_filter.len(),
offsets_length: index.offsets.len(),
num_entries: index.key_count as usize,
};
if !self.should_decrypt() {
self.index = RefCounter::new(index);
}
self.has_bloom_filter = has_bloom_filter;
Ok(bo)
}
pub(super) fn fetch_index(&self) -> RefCounter<TableIndex> {
if !self.should_decrypt() {
return self.index.clone();
}
match self.opts.index_cache() {
Some(cache) => match cache.get(&self.id) {
Some(index) => index.clone(),
None => {
let index = self.read_table_index()
.map(RefCounter::new)
.map_err(|e| {
#[cfg(feature = "tracing")]
{
tracing::error!(target: "table", info = "fail to read table idex", err = %e);
}
e
})
.unwrap();
#[cfg(feature = "std")]
cache.insert(self.id, index.clone(), self.index_len as i64);
#[cfg(not(feature = "std"))]
cache.insert(self.id, index.clone());
index
}
},
None => {
panic!("Index Cache must be set for encrypted workloads");
}
}
}
pub(super) fn read_table_index(&self) -> Result<TableIndex> {
let buf = self.read(self.index_start, self.index_len);
if self.should_decrypt() {
self.decrypt(buf)
.and_then(|data| TableIndex::unmarshal(data.as_slice()).map_err(From::from))
} else {
TableIndex::unmarshal(buf).map_err(From::from)
}
}
#[inline]
fn should_decrypt(&self) -> bool {
self.opts.encryption().is_some()
}
#[inline]
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
let encryption = self.opts.encryption();
let block_size = encryption.block_size();
let iv = &data[data.len() - block_size..];
let data = &data[..data.len() - block_size];
data.encrypt_to_vec(encryption.secret(), iv, encryption.algorithm())
.map_err(From::from)
}
}
#[inline(always)]
fn bytes_to_u32_slice(bytes: &[u8]) -> &[u32] {
const DUMMY: &[u32] = &[];
if bytes.is_empty() {
return DUMMY;
}
let len = bytes.len();
let ptr = bytes.as_ptr();
unsafe { core::slice::from_raw_parts(ptr.cast::<u32>(), len / 4) }
}