use std::{
fs::{File, OpenOptions, rename},
io::{BufWriter, Seek, Write},
mem,
path::Path,
time::{SystemTime, UNIX_EPOCH},
};
use crate::encoding;
use bloomfilter::Bloom;
use crate::engine::{PointEntry, RangeTombstone};
use super::{
BlockHandle, MetaIndexEntry, SST_BLOOM_FILTER_FALSE_POSITIVE_RATE,
SST_DATA_BLOCK_CHECKSUM_SIZE, SST_DATA_BLOCK_LEN_SIZE, SST_DATA_BLOCK_MAX_SIZE,
SST_FOOTER_SIZE, SST_HDR_MAGIC, SST_HDR_VERSION, SSTableBloomBlock, SSTableCell,
SSTableDataBlock, SSTableError, SSTableFooter, SSTableHeader, SSTableIndexEntry,
SSTablePropertiesBlock, SSTableRangeTombstoneCell, SSTableRangeTombstoneDataBlock,
};
struct BuildStats {
record_count: u64,
tombstone_count: u64,
min_lsn: u64,
max_lsn: u64,
min_timestamp: u64,
max_timestamp: u64,
min_key: Option<Vec<u8>>,
max_key: Option<Vec<u8>>,
}
impl BuildStats {
fn new() -> Self {
Self {
record_count: 0,
tombstone_count: 0,
min_lsn: u64::MAX,
max_lsn: 0,
min_timestamp: u64::MAX,
max_timestamp: 0,
min_key: None,
max_key: None,
}
}
fn track(&mut self, lsn: u64, timestamp: u64) {
self.min_lsn = self.min_lsn.min(lsn);
self.max_lsn = self.max_lsn.max(lsn);
self.min_timestamp = self.min_timestamp.min(timestamp);
self.max_timestamp = self.max_timestamp.max(timestamp);
}
fn into_properties(self, range_count: usize) -> SSTablePropertiesBlock {
SSTablePropertiesBlock {
creation_timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
.try_into()
.unwrap_or(u64::MAX),
record_count: self.record_count,
tombstone_count: self.tombstone_count,
range_tombstones_count: range_count as u64,
min_lsn: self.min_lsn,
max_lsn: self.max_lsn,
min_timestamp: self.min_timestamp,
max_timestamp: self.max_timestamp,
min_key: self.min_key.unwrap_or_default(),
max_key: self.max_key.unwrap_or_default(),
}
}
}
fn write_checksummed_block(
writer: &mut (impl Write + Seek),
data: &[u8],
) -> Result<(u64, usize), SSTableError> {
let offset = writer.stream_position()?;
let len = u32::try_from(data.len())
.map_err(|_| SSTableError::Internal(format!("block too large: {} bytes", data.len())))?;
let checksum = super::crc32(data);
writer.write_all(&len.to_le_bytes())?;
writer.write_all(data)?;
writer.write_all(&checksum.to_le_bytes())?;
Ok((offset, data.len()))
}
fn write_header(writer: &mut impl Write) -> Result<(), SSTableError> {
let header = SSTableHeader {
magic: SST_HDR_MAGIC,
version: SST_HDR_VERSION,
header_crc: 0,
};
let zeroed_bytes = encoding::encode_to_vec(&header)?;
let inner_crc = super::crc32(&zeroed_bytes);
let header = SSTableHeader {
header_crc: inner_crc,
..header
};
let header_bytes = encoding::encode_to_vec(&header)?;
let outer_crc = super::crc32(&header_bytes);
writer.write_all(&header_bytes)?;
writer.write_all(&outer_crc.to_le_bytes())?;
Ok(())
}
fn flush_data_block(
writer: &mut (impl Write + Seek),
current_block: &mut Vec<u8>,
block_first_key: &mut Option<Vec<u8>>,
index_entries: &mut Vec<SSTableIndexEntry>,
) -> Result<(), SSTableError> {
let block = SSTableDataBlock {
data: mem::take(current_block),
};
let block_bytes = encoding::encode_to_vec(&block)?;
let (offset, data_len) = write_checksummed_block(writer, &block_bytes)?;
index_entries.push(SSTableIndexEntry {
separator_key: block_first_key.take().ok_or_else(|| {
SSTableError::Internal("flush_data_block: no first key recorded for block".into())
})?,
handle: BlockHandle {
offset,
size: (SST_DATA_BLOCK_LEN_SIZE + data_len + SST_DATA_BLOCK_CHECKSUM_SIZE) as u64,
},
});
Ok(())
}
fn write_data_blocks(
writer: &mut (impl Write + Seek),
entries: impl Iterator<Item = PointEntry>,
bloom: &mut Bloom<Vec<u8>>,
) -> Result<(BuildStats, Vec<SSTableIndexEntry>), SSTableError> {
let mut stats = BuildStats::new();
let mut index_entries = Vec::new();
let mut current_block = Vec::<u8>::new();
let mut block_first_key: Option<Vec<u8>> = None;
for entry in entries {
stats.record_count += 1;
if entry.value.is_none() {
stats.tombstone_count += 1;
}
stats.track(entry.lsn, entry.timestamp);
if stats.min_key.is_none() {
stats.min_key = Some(entry.key.clone());
}
stats.max_key = Some(entry.key.clone());
if block_first_key.is_none() {
block_first_key = Some(entry.key.clone());
}
bloom.set(&entry.key);
let cell = SSTableCell {
key_len: u32::try_from(entry.key.len()).map_err(|_| {
SSTableError::Internal(format!("key too large: {} bytes", entry.key.len()))
})?,
value_len: u32::try_from(entry.value.as_ref().map_or(0, |v| v.len()))
.map_err(|_| SSTableError::Internal("value too large".into()))?,
timestamp: entry.timestamp,
is_delete: entry.value.is_none(),
lsn: entry.lsn,
};
let mut cell_bytes = encoding::encode_to_vec(&cell)?;
cell_bytes.extend_from_slice(&entry.key);
if let Some(value) = entry.value {
cell_bytes.extend_from_slice(&value);
}
current_block.extend_from_slice(&cell_bytes);
if current_block.len() >= SST_DATA_BLOCK_MAX_SIZE {
flush_data_block(
writer,
&mut current_block,
&mut block_first_key,
&mut index_entries,
)?;
}
}
if !current_block.is_empty() {
flush_data_block(
writer,
&mut current_block,
&mut block_first_key,
&mut index_entries,
)?;
}
Ok((stats, index_entries))
}
fn write_range_tombstones(
writer: &mut (impl Write + Seek),
entries: impl Iterator<Item = RangeTombstone>,
stats: &mut BuildStats,
) -> Result<(u64, usize), SSTableError> {
let mut block = SSTableRangeTombstoneDataBlock { data: Vec::new() };
for entry in entries {
stats.track(entry.lsn, entry.timestamp);
block.data.push(SSTableRangeTombstoneCell {
start_key: entry.start,
end_key: entry.end,
timestamp: entry.timestamp,
lsn: entry.lsn,
});
}
let bytes = encoding::encode_to_vec(&block)?;
write_checksummed_block(writer, &bytes)
}
fn write_metaindex(
writer: &mut (impl Write + Seek),
bloom: BlockHandle,
properties: BlockHandle,
range_deletes: BlockHandle,
) -> Result<(u64, usize), SSTableError> {
let meta_entries = vec![
MetaIndexEntry {
name: "filter.bloom".to_string(),
handle: bloom,
},
MetaIndexEntry {
name: "meta.properties".to_string(),
handle: properties,
},
MetaIndexEntry {
name: "meta.range_deletes".to_string(),
handle: range_deletes,
},
];
let mut bytes = Vec::new();
encoding::encode_vec(&meta_entries, &mut bytes)?;
write_checksummed_block(writer, &bytes)
}
fn write_footer(
file: &mut File,
metaindex: BlockHandle,
index: BlockHandle,
) -> Result<(), SSTableError> {
let current_pos = file.metadata()?.len();
let footer = SSTableFooter {
metaindex,
index,
total_file_size: current_pos + SST_FOOTER_SIZE as u64,
footer_crc32: 0,
};
let footer_bytes = encoding::encode_to_vec(&footer)?;
let footer_crc = super::crc32(&footer_bytes);
let footer_with_crc = SSTableFooter {
footer_crc32: footer_crc,
..footer
};
let footer_bytes = encoding::encode_to_vec(&footer_with_crc)?;
let mut writer = BufWriter::new(&mut *file);
writer.write_all(&footer_bytes)?;
writer.flush()?;
drop(writer);
file.sync_all()?;
Ok(())
}
pub struct SstWriter<P: AsRef<Path>> {
path: P,
}
impl<P: AsRef<Path>> SstWriter<P> {
pub fn new(path: P) -> Self {
Self { path }
}
pub fn build(
self,
point_entries: impl Iterator<Item = PointEntry>,
point_count: usize,
range_tombstones: impl Iterator<Item = RangeTombstone>,
range_count: usize,
) -> Result<(), SSTableError> {
let mut point_entries = point_entries.peekable();
let mut range_tombstones = range_tombstones.peekable();
if point_count == 0
&& point_entries.peek().is_none()
&& range_count == 0
&& range_tombstones.peek().is_none()
{
return Err(SSTableError::Internal(
"Empty iterators cannot build SSTable".into(),
));
}
let final_path = self.path.as_ref();
let tmp_path = final_path.with_extension("tmp");
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp_path)?;
let mut writer = BufWriter::new(&mut file);
write_header(&mut writer)?;
let mut bloom = Bloom::new_for_fp_rate(
point_count + range_count,
SST_BLOOM_FILTER_FALSE_POSITIVE_RATE,
)
.map_err(|e| SSTableError::Internal(e.to_string()))?;
let (mut stats, index_entries) = write_data_blocks(&mut writer, point_entries, &mut bloom)?;
let bloom_block = SSTableBloomBlock {
data: bloom.as_slice().to_vec(),
};
let bloom_bytes = encoding::encode_to_vec(&bloom_block)?;
let (bloom_off, bloom_len) = write_checksummed_block(&mut writer, &bloom_bytes)?;
let (rt_off, rt_len) = write_range_tombstones(&mut writer, range_tombstones, &mut stats)?;
let properties = stats.into_properties(range_count);
let props_bytes = encoding::encode_to_vec(&properties)?;
let (props_off, props_len) = write_checksummed_block(&mut writer, &props_bytes)?;
let (meta_off, meta_len) = write_metaindex(
&mut writer,
BlockHandle {
offset: bloom_off,
size: bloom_len as u64,
},
BlockHandle {
offset: props_off,
size: props_len as u64,
},
BlockHandle {
offset: rt_off,
size: rt_len as u64,
},
)?;
let mut index_bytes = Vec::new();
encoding::encode_vec(&index_entries, &mut index_bytes)?;
let (idx_off, idx_len) = write_checksummed_block(&mut writer, &index_bytes)?;
writer.flush()?;
drop(writer);
file.sync_all()?;
write_footer(
&mut file,
BlockHandle {
offset: meta_off,
size: meta_len as u64,
},
BlockHandle {
offset: idx_off,
size: idx_len as u64,
},
)?;
rename(&tmp_path, final_path)?;
Ok(())
}
}