use std::{mem::replace, path::PathBuf};
use bytes::{Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use tempest_core::utils::{ByteSize, HexU64};
use tempest_io::{Io, OpenOptions};
use tempest_rt::{close_file, open_file, write_exact};
use zerocopy::IntoBytes;
use crate::{
StorageError,
base::{Comparer, SeqNum},
config::SstWriteConfig,
iterator::StorageIterator,
sst::{
SstFooter,
block::{BlockBuilder, BlockBuilderStatus},
bloom::BloomFilterBuilder,
index::IndexBuilder,
},
};
#[derive(derive_more::Debug, Clone, Serialize, Deserialize)]
#[debug(
"SstWriteStats {{ file_size: {:?}, entry_count: {}, key_range: [{:?}, {:?}], seqnum_range: [{}, {}] }}",
ByteSize(*file_size), entry_count, min_key, max_key, min_seqnum.get(), max_seqnum.get(),
)]
pub(crate) struct SstWriteStats {
pub(crate) file_size: u64,
pub(crate) entry_count: u64,
pub(crate) min_key: Bytes,
pub(crate) max_key: Bytes,
pub(crate) min_seqnum: SeqNum,
pub(crate) max_seqnum: SeqNum,
}
impl Default for SstWriteStats {
fn default() -> Self {
Self {
file_size: 0,
entry_count: 0,
min_key: Bytes::new(),
max_key: Bytes::new(),
min_seqnum: SeqNum::MAX,
max_seqnum: SeqNum::ZERO,
}
}
}
#[instrument(skip(config, source), level = "debug")]
pub(crate) async fn write<I, C, S>(
path: PathBuf,
entry_estimate: u64,
config: SstWriteConfig,
mut source: S,
) -> Result<SstWriteStats, StorageError>
where
I: Io,
C: Comparer,
S: StorageIterator<I, C>,
{
let fd = open_file::<I>(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await?;
let mut filepos = 0;
let mut stats = SstWriteStats::default();
let mut block_builder =
BlockBuilder::new(config.block_target_size, config.block_restart_interval);
let mut bloom_filter_builder =
BloomFilterBuilder::new(entry_estimate as usize, config.bloom_false_positive_rate);
let mut index_builder = IndexBuilder::new();
let mut last_key = None;
let mut block_count = 0u64;
let mut total_blocks_size = 0u64;
while let Some((key, value)) = source.next().await? {
let status = block_builder.write_entry(key.key(), key.trailer(), value);
if stats.entry_count == 0 {
stats.min_key = key.key().clone();
}
stats.max_key = key.key().clone();
stats.min_seqnum = stats.min_seqnum.min(key.trailer().seqnum());
stats.max_seqnum = stats.max_seqnum.max(key.trailer().seqnum());
stats.entry_count += 1;
let (prefix, _) = key.split_up();
bloom_filter_builder.insert(prefix);
last_key = Some(key.clone());
if status == BlockBuilderStatus::Full {
let block = replace(
&mut block_builder,
BlockBuilder::new(config.block_target_size, config.block_restart_interval),
)
.finalize();
let offset = filepos;
let block_size = block.len() as u64;
filepos += block_size;
total_blocks_size += block_size;
index_builder.push(&last_key.take().unwrap(), offset, block.len() as u32);
trace!(block = block_count, size = ?ByteSize(block_size), offset = ?HexU64(offset), "flushed block");
block_count += 1;
write_exact::<_, I>(fd, block, offset).await.0?;
}
}
if block_builder.get_status() != BlockBuilderStatus::Empty {
let block = replace(
&mut block_builder,
BlockBuilder::new(config.block_target_size, config.block_restart_interval),
)
.finalize();
let offset = filepos;
let block_size = block.len() as u64;
filepos += block_size;
total_blocks_size += block_size;
index_builder.push(&last_key.take().unwrap(), offset, block.len() as u32);
trace!(block = block_count, size = ?ByteSize(block_size), offset = ?HexU64(offset), "flushed block");
block_count += 1;
write_exact::<_, I>(fd, block, offset).await.0?;
}
debug!(block_count, size = ?ByteSize(total_blocks_size), "wrote all blocks");
let bloom_filter = bloom_filter_builder.finalize();
let bloom_footer = bloom_filter.footer();
let bloom_bits = bloom_filter.bits().clone();
let index = index_builder.finalize();
let bloom_offset = filepos;
let bloom_size = bloom_bits.len();
filepos += bloom_size as u64;
write_exact::<_, I>(fd, bloom_bits, bloom_offset).await.0?;
debug!(size = ?ByteSize(bloom_size as u64), offset = ?HexU64(bloom_offset), "wrote bloom filter");
let index_offset = filepos;
let index_size = index.len();
filepos += index_size as u64;
write_exact::<_, I>(fd, index, index_offset).await.0?;
debug!(size=?ByteSize(index_size as u64), offset=?HexU64(index_offset), "wrote index");
let footer = SstFooter::new(
bloom_offset,
bloom_size.try_into().expect("bloom size exceeds 4GiB"),
bloom_footer,
index_offset,
index_size.try_into().expect("index size exceeds 4GiB"),
);
let footer_offset = filepos;
let footer_bytes = footer.as_bytes();
let footer_size = footer_bytes.len();
let mut footer_buf = BytesMut::with_capacity(footer_size);
footer_buf.extend_from_slice(footer_bytes);
filepos += footer_size as u64;
write_exact::<_, I>(fd, footer_buf, footer_offset).await.0?;
debug!(offset = ?HexU64(footer_offset), "wrote sst footer");
stats.file_size = filepos;
close_file::<I>(fd).await?;
debug!(?stats, "finished writing sst");
Ok(stats)
}