use std::path::PathBuf;
use bytes::BytesMut;
use tempest_io::{Io, IoBuf, OpenOptions, Statx};
use tempest_rt::{close_file, open_file, read_exact, stat_file};
use zerocopy::Ref;
use crate::{
base::Comparer,
sst::{SST_MAGICNUM, SstFooter, SstReadError, bloom::BloomFilter, index::BlockIndex},
};
pub(crate) struct SstHandle<I: Io, C: Comparer> {
pub(super) fd: I::Fd,
pub(super) bloom_filter: BloomFilter,
pub(super) block_index: BlockIndex<C>,
}
impl<I: Io, C: Comparer> SstHandle<I, C> {
pub(crate) async fn close(self) -> Result<(), SstReadError> {
close_file::<I>(self.fd).await?;
Ok(())
}
}
#[instrument(level = "debug")]
pub(crate) async fn load<I: Io, C: Comparer>(
path: PathBuf,
) -> Result<SstHandle<I, C>, SstReadError> {
let fd = open_file::<I>(path, OpenOptions::new().read(true)).await?;
let statx = stat_file::<I>(fd).await?;
let file_size = statx.stx_size();
debug!(file_size, "opened SST file");
const FOOTER_SIZE: usize = size_of::<SstFooter>();
let footer_offset = file_size - FOOTER_SIZE as u64;
let (result, s) = read_exact::<_, I>(fd, [0u8; FOOTER_SIZE], footer_offset).await;
result?;
let footer = Ref::<_, SstFooter>::from_bytes(s.as_ref())
.expect("footer buf should have read exactly size_of(footer) bytes");
if footer.magic != *SST_MAGICNUM {
close_file::<I>(fd).await.unwrap();
return Err(SstReadError::InvalidMagic);
}
debug!(?footer, "read SST footer");
let bloom_size = footer.bloom_size.get() as usize;
let bloom_offset = footer.bloom_offset.get();
let bloom_buf = BytesMut::with_capacity(bloom_size);
let (result, s) = read_exact::<_, I>(fd, bloom_buf.slice(..bloom_size), bloom_offset).await;
result?;
let bloom_bits = s.into_inner().freeze();
assert_eq!(bloom_bits.len(), bloom_size);
let bloom_filter = BloomFilter::from_parts(bloom_bits, footer.bloom_footer.clone());
debug!(size = bloom_size, "read bloom filter");
let index_size = footer.index_size.get() as usize;
let index_offset = footer.index_offset.get();
let index_buf = BytesMut::with_capacity(index_size);
let (result, s) = read_exact::<_, I>(fd, index_buf.slice(..index_size), index_offset).await;
result?;
let index_raw = s.into_inner().freeze();
assert_eq!(index_raw.len(), index_size);
let block_index = BlockIndex::new(index_raw);
debug!(size = index_size, "read block index");
debug!("SST loaded");
Ok(SstHandle {
fd,
bloom_filter,
block_index,
})
}