use std::{
fs,
path::PathBuf,
sync::Arc,
};
use bytes::Bytes;
use rand::random;
use tracing::instrument;
use crate::{
errs::SegmentError,
keypair::{
KeyBytes,
ValueBytes,
},
memtable::Memtable,
merge::{
MergeIterator,
RawMergeIterator,
},
raw_entry::RawEntry,
segment::{
DEFAULT_SEGMENT_SIZE,
Segment,
},
segment_builder::SegmentBuilder,
utils::Serializer,
};
pub struct CompactOutput {
pub segment: Arc<Segment>,
pub min_key: Vec<u8>,
pub max_key: Vec<u8>,
pub entry_count: u64,
}
fn reopen_segment_for_reading(
output_path: &PathBuf,
segment_id: u64,
) -> Result<Arc<Segment>, SegmentError> {
use crate::{
index::Index,
map::Map,
segment::Metadata,
};
let key_id = segment_id;
let val_id = segment_id + 1;
let key_path = output_path.join(key_id.to_string());
let val_path = output_path.join(val_id.to_string());
let key_map = Arc::new(match Map::open(key_path) {
| Ok(v) => v,
| Err(e) => return Err(e),
});
let val_map = Arc::new(match Map::open(val_path) {
| Ok(v) => v,
| Err(e) => return Err(e),
});
let key_metadata = {
let len = key_map.len();
if len < 32 {
return Err(SegmentError::CorruptedBlock);
}
match key_map.read_range(len - 32..len, |slice| {
Metadata::from(Bytes::copy_from_slice(slice))
}) {
| Ok(v) => v,
| Err(e) => return Err(e),
}
};
let val_metadata = {
let len = val_map.len();
if len < 32 {
return Err(SegmentError::CorruptedBlock);
}
match val_map.read_range(len - 32..len, |slice| {
Metadata::from(Bytes::copy_from_slice(slice))
}) {
| Ok(v) => v,
| Err(e) => return Err(e),
}
};
let index_bytes = {
let start = key_metadata.index_start();
let size = key_metadata.index_size();
if key_map.len() < start + size {
return Err(SegmentError::CorruptedBlock);
}
match key_map.read_range(start..start + size, |slice| Bytes::copy_from_slice(slice)) {
| Ok(v) => v,
| Err(e) => return Err(e),
}
};
let key_index = Index::from(index_bytes);
let val_block_count = val_metadata.block_count() as u64;
Segment::open(key_map, key_index, key_id, val_map, val_id, val_block_count)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn compact<I>(
iterators: Vec<I>,
output_path: PathBuf,
segment_id: u64,
) -> Result<CompactOutput, SegmentError>
where
I: Iterator<Item = (KeyBytes, ValueBytes)>, {
use std::time::Instant;
let start_time = Instant::now();
if let Some(parent) = output_path.parent() {
if let Err(e) = fs::create_dir_all(parent) {
return Err(SegmentError::IoError(e));
}
}
if let Err(e) = fs::create_dir_all(&output_path) {
return Err(SegmentError::IoError(e));
}
let setup_time = start_time.elapsed();
let merge_start = Instant::now();
let merge_iter = MergeIterator::new(iterators);
let merge_setup_time = merge_start.elapsed();
let builder_start = Instant::now();
let builder = match SegmentBuilder::new(output_path.clone()) {
| Ok(b) => b,
| Err(e) => return Err(e),
};
let seed = random();
let segment = match builder.new_segment(segment_id, seed, DEFAULT_SEGMENT_SIZE) {
| Ok(s) => s,
| Err(e) => return Err(e),
};
let builder_time = builder_start.elapsed();
let mut entry_count = 0u64;
let mut min_key: Option<Vec<u8>> = None;
let mut max_key: Option<Vec<u8>> = None;
let mut last_key_bytes: Option<Bytes> = None;
let segment_mut = match Arc::try_unwrap(segment) {
| Ok(s) => s,
| Err(_) => {
return Err(SegmentError::CantCreateWriter(
crate::segment::BlockType::Key,
segment_id,
));
},
};
let seg = segment_mut;
let mut last_key: Option<(u64, Bytes)> = None;
let mut skip_until_new_key = false;
let mut serialize_time = std::time::Duration::ZERO;
let mut write_time = std::time::Duration::ZERO;
let loop_start = Instant::now();
for (key, value) in merge_iter {
let current_key = (key.ns(), key.key().clone());
let is_new_key = match &last_key {
| None => true,
| Some(prev) => prev != ¤t_key,
};
if is_new_key {
if value.is_tombstone() {
skip_until_new_key = true;
last_key = Some(current_key);
continue;
}
skip_until_new_key = false;
last_key = Some(current_key);
} else {
continue;
}
let ser_start = Instant::now();
let key_bytes = key.serialize();
let val_bytes = value.serialize();
serialize_time += ser_start.elapsed();
if min_key.is_none() {
min_key = Some(key_bytes.to_vec());
}
let write_start = Instant::now();
if let Err(e) = seg.write(key_bytes.as_ref(), val_bytes.as_ref()) {
return Err(e);
}
write_time += write_start.elapsed();
entry_count += 1;
last_key_bytes = Some(key_bytes);
}
let loop_time = loop_start.elapsed();
if let Some(last) = last_key_bytes {
max_key = Some(last.to_vec());
}
let close_start = Instant::now();
if let Err(e) = seg.close() {
return Err(e);
}
let close_time = close_start.elapsed();
drop(seg);
let reopened = match reopen_segment_for_reading(&output_path, segment_id) {
| Ok(s) => s,
| Err(e) => return Err(e),
};
let total_time = start_time.elapsed();
let merge_time = loop_time - serialize_time - write_time;
tracing::info!(
segment_id = segment_id,
entries = entry_count,
total_ms = total_time.as_millis(),
setup_ms = setup_time.as_millis(),
merge_setup_ms = merge_setup_time.as_millis(),
builder_ms = builder_time.as_millis(),
loop_ms = loop_time.as_millis(),
merge_ms = merge_time.as_millis(),
serialize_ms = serialize_time.as_millis(),
write_ms = write_time.as_millis(),
close_ms = close_time.as_millis(),
"Compaction timing breakdown"
);
Ok(CompactOutput {
segment: reopened,
min_key: min_key.unwrap_or_default(),
max_key: max_key.unwrap_or_default(),
entry_count,
})
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn compact_raw<I>(
iterators: Vec<I>,
output_path: PathBuf,
segment_id: u64,
) -> Result<CompactOutput, SegmentError>
where
I: Iterator<Item = Result<RawEntry, SegmentError>>, {
use std::time::Instant;
let start_time = Instant::now();
if let Some(parent) = output_path.parent() {
if let Err(e) = fs::create_dir_all(parent) {
return Err(SegmentError::IoError(e));
}
}
if let Err(e) = fs::create_dir_all(&output_path) {
return Err(SegmentError::IoError(e));
}
let merge_iter = RawMergeIterator::new(iterators);
let builder = match SegmentBuilder::new(output_path.clone()) {
| Ok(b) => b,
| Err(e) => return Err(e),
};
let seed = random();
let segment = match builder.new_segment(segment_id, seed, DEFAULT_SEGMENT_SIZE) {
| Ok(s) => s,
| Err(e) => return Err(e),
};
let mut entry_count = 0u64;
let mut min_key: Option<Vec<u8>> = None;
let mut max_key: Option<Vec<u8>> = None;
let mut last_key_bytes: Option<Bytes> = None;
let segment_mut = match Arc::try_unwrap(segment)
.map_err(|_| SegmentError::CantCreateWriter(crate::segment::BlockType::Key, segment_id))
{
| Ok(v) => v,
| Err(e) => return Err(e),
};
let seg = segment_mut;
let mut last_dedup_key: Option<Bytes> = None;
let mut skip_until_new_key = false;
let mut write_time = std::time::Duration::ZERO;
let loop_start = Instant::now();
for result in merge_iter {
let entry = match result {
| Ok(e) => e,
| Err(_) => continue, };
let current_dedup = entry.dedup_key();
let is_new_key = match &last_dedup_key {
| None => true,
| Some(prev) => prev.as_ref() != current_dedup,
};
if is_new_key {
if entry.is_tombstone() {
skip_until_new_key = true;
last_dedup_key = Some(entry.raw_key().slice(..entry.raw_key().len() - 16));
continue;
}
skip_until_new_key = false;
last_dedup_key = Some(entry.raw_key().slice(..entry.raw_key().len() - 16));
} else {
continue;
}
let key_ref = entry.raw_key();
let val_ref = entry.raw_val();
if min_key.is_none() {
min_key = Some(key_ref.to_vec());
}
let write_start = Instant::now();
if let Err(e) = seg.write(key_ref.as_ref(), val_ref.as_ref()) {
return Err(e);
}
write_time += write_start.elapsed();
entry_count += 1;
last_key_bytes = Some(key_ref.clone());
}
let loop_time = loop_start.elapsed();
if let Some(last) = last_key_bytes {
max_key = Some(last.to_vec());
}
let close_start = Instant::now();
if let Err(e) = seg.close() {
return Err(e);
} let close_time = close_start.elapsed();
drop(seg);
let reopened = match reopen_segment_for_reading(&output_path, segment_id) {
| Ok(s) => s,
| Err(e) => return Err(e),
};
let total_time = start_time.elapsed();
tracing::info!(
segment_id = segment_id,
entries = entry_count,
total_ms = total_time.as_millis(),
loop_ms = loop_time.as_millis(),
write_ms = write_time.as_millis(),
close_ms = close_time.as_millis(),
"Raw compaction timing breakdown (zero-copy + deferred index)"
);
Ok(CompactOutput {
segment: reopened,
min_key: min_key.unwrap_or_default(),
max_key: max_key.unwrap_or_default(),
entry_count,
})
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn flush_memtable(
memtable: Arc<Memtable>,
output_path: PathBuf,
segment_id: u64,
) -> Result<(Arc<Segment>, Vec<u8>, Vec<u8>), SegmentError> {
if let Some(parent) = output_path.parent() {
if let Err(e) = fs::create_dir_all(parent) {
return Err(SegmentError::IoError(e));
}
}
if let Err(e) = fs::create_dir_all(&output_path) {
return Err(SegmentError::IoError(e));
}
let builder = match SegmentBuilder::new(output_path.clone()) {
| Ok(v) => v,
| Err(e) => return Err(e),
};
let seed = random();
let segment = match builder.new_segment(segment_id, seed, DEFAULT_SEGMENT_SIZE) {
| Ok(s) => s,
| Err(e) => return Err(e),
};
let mut entry_count = 0u64;
let segment_mut = match Arc::try_unwrap(segment) {
| Ok(s) => s,
| Err(_) => {
return Err(SegmentError::CantCreateWriter(
crate::segment::BlockType::Key,
segment_id,
));
},
};
let seg = segment_mut;
let mut min_key: Option<Vec<u8>> = None;
let mut max_key: Option<Vec<u8>> = None;
let mut last_key: Option<KeyBytes> = None;
let mut last_val: Option<ValueBytes> = None;
use std::collections::Bound;
let iter = memtable.scan(Bound::Unbounded, Bound::Unbounded);
for (key, value) in iter {
let same_logical_key = match &last_key {
| Some(prev) => prev.ns() == key.ns() && prev.as_bytes() == key.as_bytes(),
| None => false,
};
if same_logical_key {
continue;
} else {
if let (Some(prev_key), Some(prev_val)) = (last_key.take(), last_val.take()) {
let key_bytes = prev_key.serialize();
let val_bytes = prev_val.serialize();
if min_key.is_none() {
min_key = Some(key_bytes.to_vec());
}
if let Err(e) = seg.write(key_bytes.as_ref(), val_bytes.as_ref()) {
return Err(e);
}
entry_count += 1;
max_key = Some(key_bytes.to_vec());
}
last_key = Some(key);
last_val = Some(value);
}
}
if let (Some(prev_key), Some(prev_val)) = (last_key, last_val) {
let key_bytes = prev_key.serialize();
let val_bytes = prev_val.serialize();
if min_key.is_none() {
min_key = Some(key_bytes.to_vec());
}
if let Err(e) = seg.write(key_bytes.as_ref(), val_bytes.as_ref()) {
return Err(e);
}
entry_count += 1;
max_key = Some(key_bytes.to_vec());
}
if let Err(e) = seg.close() {
return Err(e);
}
tracing::info!(
memtable_id = memtable.id(),
segment_id = segment_id,
entries = entry_count,
"Memtable flush complete"
);
drop(seg);
let key_id = segment_id;
let val_id = segment_id + 1;
use crate::{
index::Index,
map::Map,
};
let key_path = output_path.join(key_id.to_string());
let val_path = output_path.join(val_id.to_string());
let key_map = Arc::new(match Map::open(key_path) {
| Ok(v) => v,
| Err(e) => return Err(e),
});
let val_map = Arc::new(match Map::open(val_path) {
| Ok(v) => v,
| Err(e) => return Err(e),
});
use crate::segment::Metadata;
let key_metadata = {
let len = key_map.len();
if len < 32 {
return Err(SegmentError::CorruptedBlock);
}
match key_map.read_range(len - 32..len, |slice| {
Metadata::from(Bytes::copy_from_slice(slice))
}) {
| Ok(v) => v,
| Err(e) => return Err(e),
}
};
let val_metadata = {
let len = val_map.len();
if len < 32 {
return Err(SegmentError::CorruptedBlock);
}
match val_map.read_range(len - 32..len, |slice| {
Metadata::from(Bytes::copy_from_slice(slice))
}) {
| Ok(v) => v,
| Err(e) => return Err(e),
}
};
let index_bytes = {
let start = key_metadata.index_start();
let size = key_metadata.index_size();
if key_map.len() < start + size {
return Err(SegmentError::CorruptedBlock);
}
match key_map.read_range(start..start + size, |slice| Bytes::copy_from_slice(slice)) {
| Ok(v) => v,
| Err(e) => return Err(e),
}
};
let key_index = Index::from(index_bytes);
let val_block_count = val_metadata.block_count() as u64;
let segment = match Segment::open(key_map, key_index, key_id, val_map, val_id, val_block_count)
{
| Ok(s) => s,
| Err(e) => return Err(e),
};
Ok((
segment,
min_key.unwrap_or_default(),
max_key.unwrap_or_default(),
))
}
#[cfg(test)]
mod tests {
use std::collections::Bound;
use bytes::Bytes;
use tempfile::tempdir;
use super::*;
use crate::{
hlc::{
HLC,
HybridLogicalClock,
},
keypair::{
DEFAULT_NS,
KeyBytes,
ValueBytes,
},
memtable::Memtable,
};
#[test]
fn test_compact_empty_iterators() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("compacted.segment");
let empty: Vec<Vec<(KeyBytes, ValueBytes)>> = vec![];
let iters = empty.into_iter().map(IntoIterator::into_iter).collect();
let result = compact(iters, output_path, 1);
assert!(result.is_ok(), "Compacting empty iterators should succeed");
}
#[test]
fn test_compact_single_memtable() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("compacted.segment");
let clock = HybridLogicalClock::new();
let memtable = Memtable::new(1, 1024 * 1024);
for i in 0..10 {
let key = KeyBytes::new(DEFAULT_NS, Bytes::from(format!("key-{}", i)), clock.time());
let val = ValueBytes::new(DEFAULT_NS, Bytes::from(format!("value-{}", i)));
memtable.put(key, val).unwrap();
}
let iter = memtable.scan(Bound::Unbounded, Bound::Unbounded);
let segment = compact(vec![iter], output_path, 1).unwrap();
assert!(segment.segment.is_read_only());
}
#[test]
fn test_compact_multiple_memtables() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("compacted.segment");
let clock = HybridLogicalClock::new();
let memtable1 = Memtable::new(1, 1024 * 1024);
let memtable2 = Memtable::new(2, 1024 * 1024);
memtable1
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key1"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value1_v2")),
)
.unwrap();
memtable1
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key2"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value2_v1")),
)
.unwrap();
memtable2
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key1"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value1_v3")),
)
.unwrap();
memtable2
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key3"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value3_v1")),
)
.unwrap();
let iter1 = memtable1.scan(Bound::Unbounded, Bound::Unbounded);
let iter2 = memtable2.scan(Bound::Unbounded, Bound::Unbounded);
let segment = compact(vec![iter1, iter2], output_path, 1).unwrap();
assert!(segment.segment.is_read_only());
}
#[test]
fn test_compact_preserves_version_order() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("compacted.segment");
let clock = HybridLogicalClock::new();
let memtable1 = Memtable::new(1, 1024 * 1024);
let memtable2 = Memtable::new(2, 1024 * 1024);
let key_name = Bytes::from("versioned-key");
memtable1
.put(
KeyBytes::new(DEFAULT_NS, key_name.clone(), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("v1")),
)
.unwrap();
memtable2
.put(
KeyBytes::new(DEFAULT_NS, key_name.clone(), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("v2")),
)
.unwrap();
let iter1 = memtable1.scan(Bound::Unbounded, Bound::Unbounded);
let iter2 = memtable2.scan(Bound::Unbounded, Bound::Unbounded);
let segment = compact(vec![iter1, iter2], output_path, 1).unwrap();
assert!(segment.segment.is_read_only());
}
#[test]
fn test_flush_memtable_basic() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("flushed.segment");
let clock = HybridLogicalClock::new();
let memtable = Arc::new(Memtable::new(1, 1024 * 1024));
for i in 0..10 {
let key = KeyBytes::new(DEFAULT_NS, Bytes::from(format!("key-{}", i)), clock.time());
let val = ValueBytes::new(DEFAULT_NS, Bytes::from(format!("value-{}", i)));
memtable.put(key, val).unwrap();
}
memtable.freeze();
let (segment, _min_key, _max_key) =
flush_memtable(memtable.clone(), output_path, 1).unwrap();
assert!(segment.is_read_only());
}
#[test]
fn test_flush_memtable_preserves_tombstones() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("flushed.segment");
let clock = HybridLogicalClock::new();
let memtable = Arc::new(Memtable::new(1, 1024 * 1024));
let key = KeyBytes::new(DEFAULT_NS, Bytes::from("key-to-delete"), clock.time());
let val = ValueBytes::new(DEFAULT_NS, Bytes::from("value"));
memtable.put(key.clone(), val).unwrap();
let tombstone_key = KeyBytes::new(DEFAULT_NS, Bytes::from("key-to-delete"), clock.time());
let tombstone = ValueBytes::new_tombstone(DEFAULT_NS);
memtable.put(tombstone_key, tombstone).unwrap();
memtable.freeze();
let (segment, _min_key, _max_key) =
flush_memtable(memtable.clone(), output_path, 1).unwrap();
assert!(segment.is_read_only());
}
#[test]
fn test_flush_empty_memtable() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("flushed.segment");
let memtable = Arc::new(Memtable::new(1, 1024 * 1024));
memtable.freeze();
let (segment, _min_key, _max_key) =
flush_memtable(memtable.clone(), output_path, 1).unwrap();
assert!(segment.is_read_only());
}
#[test]
fn test_flush_memtable_with_multiple_versions() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("flushed.segment");
let clock = HybridLogicalClock::new();
let memtable = Arc::new(Memtable::new(1, 1024 * 1024));
let key_name = Bytes::from("versioned-key");
memtable
.put(
KeyBytes::new(DEFAULT_NS, key_name.clone(), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("v1")),
)
.unwrap();
memtable
.put(
KeyBytes::new(DEFAULT_NS, key_name.clone(), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("v2")),
)
.unwrap();
memtable
.put(
KeyBytes::new(DEFAULT_NS, key_name.clone(), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("v3")),
)
.unwrap();
memtable.freeze();
let (segment, _min_key, _max_key) =
flush_memtable(memtable.clone(), output_path, 1).unwrap();
assert!(segment.is_read_only());
}
#[test]
fn test_flush_reopen_simple() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("flush-simple.segment");
let clock = HybridLogicalClock::new();
let memtable = Arc::new(Memtable::new(1, 1024 * 1024));
memtable
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key1"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value1")),
)
.unwrap();
memtable
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key2"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value2")),
)
.unwrap();
memtable.freeze();
let (segment, _min_key, _max_key) =
flush_memtable(memtable, output_path.clone(), 1).unwrap();
assert!(segment.is_read_only());
drop(segment);
let builder = SegmentBuilder::new(output_path).unwrap();
let reopened = builder.open(1).unwrap();
let reader = reopened.new_reader().unwrap();
let mut count = 0;
for result in reader.scan(Bound::Unbounded, Bound::Unbounded) {
let (_key, _value) = result.unwrap();
count += 1;
}
assert_eq!(count, 2, "Should have 2 entries");
}
#[test]
fn test_compact_reopen_simple() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("simple.segment");
let clock = HybridLogicalClock::new();
let memtable = Memtable::new(1, 1024 * 1024);
memtable
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key1"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value1")),
)
.unwrap();
memtable
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key2"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value2")),
)
.unwrap();
let iter = memtable.scan(Bound::Unbounded, Bound::Unbounded);
let segment = compact(vec![iter], output_path.clone(), 1).unwrap();
assert!(segment.segment.is_read_only());
drop(segment);
let builder = SegmentBuilder::new(output_path).unwrap();
let reopened = builder.open(1).unwrap();
let reader = reopened.new_reader().unwrap();
let mut count = 0;
for result in reader.scan(Bound::Unbounded, Bound::Unbounded) {
let (_key, _value) = result.unwrap();
count += 1;
}
assert_eq!(count, 2, "Should have 2 entries");
}
#[test]
fn test_compact_filters_tombstones() {
let dir = tempdir().unwrap();
let output_path = dir.path().join("compacted.segment");
let clock = HybridLogicalClock::new();
let memtable1 = Memtable::new(1, 1024 * 1024);
let memtable2 = Memtable::new(2, 1024 * 1024);
memtable1
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key1"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value1")),
)
.unwrap();
memtable2
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key1"), clock.time()),
ValueBytes::new_tombstone(DEFAULT_NS),
)
.unwrap();
memtable2
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("key2"), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from("value2")),
)
.unwrap();
let iter1 = memtable1.scan(Bound::Unbounded, Bound::Unbounded);
let iter2 = memtable2.scan(Bound::Unbounded, Bound::Unbounded);
let output_path_clone = output_path.clone();
let segment = compact(vec![iter1, iter2], output_path, 1).unwrap();
assert!(segment.segment.is_read_only());
drop(segment);
let builder = SegmentBuilder::new(output_path_clone).unwrap();
let reopened = builder.open(1).unwrap();
let reader = reopened.new_reader().unwrap();
let mut count = 0;
for result in reader.scan(Bound::Unbounded, Bound::Unbounded) {
let (_key, value) = result.unwrap();
assert!(
!value.is_tombstone(),
"Tombstones should be filtered during compaction"
);
count += 1;
}
assert_eq!(
count, 1,
"Should only have one non-tombstone entry after compaction"
);
}
#[test]
fn test_flush_vs_compact_tombstone_handling() {
let dir = tempdir().unwrap();
let clock = HybridLogicalClock::new();
let memtable = Arc::new(Memtable::new(1, 1024 * 1024));
memtable
.put(
KeyBytes::new(DEFAULT_NS, Bytes::from("deleted-key"), clock.time()),
ValueBytes::new_tombstone(DEFAULT_NS),
)
.unwrap();
memtable.freeze();
let flush_path = dir.path().join("flushed.segment");
let (flushed_segment, _min_key, _max_key) =
flush_memtable(memtable.clone(), flush_path.clone(), 1).unwrap();
drop(flushed_segment);
let builder = SegmentBuilder::new(flush_path).unwrap();
let reopened_flush = builder.open(1).unwrap();
let flush_reader = reopened_flush.new_reader().unwrap();
let mut found_tombstone = false;
for result in flush_reader.scan(Bound::Unbounded, Bound::Unbounded) {
let (_key, value) = result.unwrap();
if value.is_tombstone() {
found_tombstone = true;
}
}
assert!(found_tombstone, "Flush should preserve tombstones for L0");
let compact_path = dir.path().join("compacted.segment");
let iter = memtable.scan(Bound::Unbounded, Bound::Unbounded);
let compacted_segment = compact(vec![iter], compact_path.clone(), 2).unwrap();
drop(compacted_segment);
let builder2 = SegmentBuilder::new(compact_path).unwrap();
let reopened_compact = builder2.open(2).unwrap();
let compact_reader = reopened_compact.new_reader().unwrap();
let mut found_tombstone_in_compact = false;
for result in compact_reader.scan(Bound::Unbounded, Bound::Unbounded) {
let (_key, value) = result.unwrap();
if value.is_tombstone() {
found_tombstone_in_compact = true;
}
}
assert!(
!found_tombstone_in_compact,
"Compact should filter tombstones"
);
}
}