use crate::btree::block::{BlockCompressionType, BlockHandle};
use crate::btree::footer::BTreeFileFooter;
use crate::btree::meta::BTreeIndexMeta;
use crate::btree::sst_file::SstFileWriter;
use crate::btree::var_len::{encode_var_int, encode_var_long};
use crate::io::FileWrite;
use roaring::RoaringTreemap;
use std::cmp::Ordering;
use std::io;
pub struct BTreeIndexWriter<F: Fn(&[u8], &[u8]) -> Ordering> {
sst_writer: SstFileWriter,
current_row_ids: Vec<i64>,
last_key: Option<Vec<u8>>,
first_key: Option<Vec<u8>>,
null_bitmap: Option<RoaringTreemap>,
row_count: u64,
key_comparator: F,
}
pub struct BTreeWriteResult {
pub meta: BTreeIndexMeta,
pub row_count: u64,
}
impl BTreeIndexWriter<fn(&[u8], &[u8]) -> Ordering> {
pub fn new(
writer: Box<dyn FileWrite>,
block_size: usize,
compression_type: BlockCompressionType,
) -> Self {
Self {
sst_writer: SstFileWriter::new(writer, block_size, compression_type),
current_row_ids: Vec::new(),
last_key: None,
first_key: None,
null_bitmap: None,
row_count: 0,
key_comparator: |a, b| a.cmp(b),
}
}
}
impl<F: Fn(&[u8], &[u8]) -> Ordering> BTreeIndexWriter<F> {
pub fn with_comparator(
writer: Box<dyn FileWrite>,
block_size: usize,
compression_type: BlockCompressionType,
cmp: F,
) -> Self {
Self {
sst_writer: SstFileWriter::new(writer, block_size, compression_type),
current_row_ids: Vec::new(),
last_key: None,
first_key: None,
null_bitmap: None,
row_count: 0,
key_comparator: cmp,
}
}
pub async fn write(&mut self, key: Option<&[u8]>, row_id: i64) -> io::Result<()> {
self.row_count += 1;
match key {
None => {
self.null_bitmap
.get_or_insert_with(RoaringTreemap::new)
.insert(row_id as u64);
}
Some(k) => {
if let Some(ref last) = self.last_key {
if (self.key_comparator)(k, last) != Ordering::Equal {
self.flush_row_ids().await?;
}
}
self.last_key = Some(k.to_vec());
self.current_row_ids.push(row_id);
if self.first_key.is_none() {
self.first_key = Some(k.to_vec());
}
}
}
Ok(())
}
async fn flush_row_ids(&mut self) -> io::Result<()> {
if self.current_row_ids.is_empty() {
return Ok(());
}
let mut value_buf = Vec::with_capacity(self.current_row_ids.len() * 9 + 5);
encode_var_int(&mut value_buf, self.current_row_ids.len() as i32)?;
for &row_id in &self.current_row_ids {
encode_var_long(&mut value_buf, row_id)?;
}
self.current_row_ids.clear();
if let Some(ref key) = self.last_key {
self.sst_writer.put(key, &value_buf).await?;
}
Ok(())
}
async fn write_null_bitmap(&mut self) -> io::Result<Option<BlockHandle>> {
let bitmap = match &self.null_bitmap {
Some(bm) => bm,
None => return Ok(None),
};
let mut serialized = Vec::new();
bitmap.serialize_into(&mut serialized)?;
let length = serialized.len();
let mut hasher = crc32fast::Hasher::new();
hasher.update(&serialized);
let crc_value = hasher.finalize();
let null_bitmap_handle = BlockHandle::new(self.sst_writer.position(), length as u32);
let mut block_data = Vec::with_capacity(length + 4);
block_data.extend_from_slice(&serialized);
block_data.extend_from_slice(&(crc_value as i32).to_le_bytes());
self.sst_writer.write_raw(&block_data).await?;
Ok(Some(null_bitmap_handle))
}
pub async fn finish(mut self) -> io::Result<BTreeWriteResult> {
self.flush_row_ids().await?;
self.sst_writer.flush().await?;
let null_bitmap_handle = self.write_null_bitmap().await?;
let bloom_filter_handle = None;
let index_block_handle = self.sst_writer.write_index_block().await?;
let footer =
BTreeFileFooter::new(bloom_filter_handle, index_block_handle, null_bitmap_handle);
let footer_bytes = footer.write_footer();
self.sst_writer.write_raw(&footer_bytes).await?;
self.sst_writer.close().await?;
let meta = BTreeIndexMeta::new(
self.first_key.clone(),
self.last_key.clone(),
self.null_bitmap.is_some(),
);
Ok(BTreeWriteResult {
meta,
row_count: self.row_count,
})
}
}