use std::{
fmt::Display,
mem,
ptr,
sync::{
Arc,
atomic::{
AtomicBool,
AtomicU64,
Ordering::Relaxed,
},
},
};
use bytes::{
BufMut,
Bytes,
BytesMut,
};
use parking_lot::Mutex;
use crate::{
block::{
BLOCK_SIZE,
Block,
EntryFlag,
EntryFlag::{
Complete,
End,
Middle,
Start,
},
MAX_ENTRY_SIZE,
},
errs::{
SegmentError,
SegmentError::{
CantCreateReader,
ReadOnly,
},
},
index::Index,
keypair::DEFAULT_NS,
map::Map,
segment::BlockType::{
Key,
Value,
},
segment_reader::{
ReadConfig,
SegmentReader,
},
segment_writer::SegmentWriter,
};
pub(crate) const VALUE_LOCATION_SIZE: usize = size_of::<u64>() + size_of::<u16>();
const VALUE_BLOCK_OFFSET: usize = 0;
const VALUE_ENTRY_OFFSET: usize = size_of::<u64>();
pub(crate) const KEY_DATA_OFFSET: usize = VALUE_LOCATION_SIZE;
pub(crate) const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
const PREALLOCATED_FILE_THRESHOLD: u64 = DEFAULT_SEGMENT_SIZE;
const METADATA_SIZE: usize = 4 * size_of::<u64>();
#[derive(Debug)]
pub enum BlockType {
Key,
Value,
}
impl Display for BlockType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
| Key => write!(f, "key"),
| Value => write!(f, "value"),
}
}
}
#[derive(Debug)]
pub(crate) struct Metadata {
id: u64,
block_count: u64,
index_size: u64,
index_start: u64,
}
impl Metadata {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn new(id: u64, block_count: u64, index_size: u64, index_start: u64) -> Self {
Self {
id,
block_count,
index_size,
index_start,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn serialized_size(&self) -> usize {
4 * size_of::<u64>()
}
pub(crate) unsafe fn finalize(&self, dst: *mut u8) {
debug_assert!(!dst.is_null(), "Destination pointer must not be null");
debug_assert!(
dst as usize % std::mem::align_of::<u64>() == 0,
"Destination pointer must be 8-byte aligned for u64 writes"
);
let mut offset = 0;
unsafe {
ptr::copy_nonoverlapping(
self.id.to_le_bytes().as_ptr(),
dst.add(offset),
size_of::<u64>(),
);
offset += size_of::<u64>();
ptr::copy_nonoverlapping(
self.block_count.to_le_bytes().as_ptr(),
dst.add(offset),
size_of::<u64>(),
);
offset += size_of::<u64>();
ptr::copy_nonoverlapping(
self.index_size.to_le_bytes().as_ptr(),
dst.add(offset),
size_of::<u64>(),
);
offset += size_of::<u64>();
ptr::copy_nonoverlapping(
self.index_start.to_le_bytes().as_ptr(),
dst.add(offset),
size_of::<u64>(),
);
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn id(&self) -> u64 {
self.id
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn block_count(&self) -> usize {
self.block_count as usize
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn index_size(&self) -> usize {
self.index_size as usize
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn index_start(&self) -> usize {
self.index_start as usize
}
}
impl From<Metadata> for Bytes {
fn from(metadata: Metadata) -> Bytes {
let size = metadata.serialized_size();
let mut buffer = BytesMut::with_capacity(size);
buffer.resize(size, 0);
unsafe {
metadata.finalize(buffer.as_mut_ptr());
}
buffer.freeze()
}
}
impl From<Bytes> for Metadata {
fn from(bytes: Bytes) -> Self {
assert!(bytes.len() >= 32, "Metadata requires at least 32 bytes");
let id = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
let block_count = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
let index_size = u64::from_le_bytes(bytes[16..24].try_into().unwrap());
let index_start = u64::from_le_bytes(bytes[24..32].try_into().unwrap());
Self {
id,
block_count,
index_size,
index_start,
}
}
}
type BlockAccumulator = (Vec<u16>, Vec<u8>);
pub struct Segment {
key_writer: Mutex<Option<SegmentWriter>>,
key_handle: Option<Arc<Map>>,
key_index: Arc<parking_lot::RwLock<Index>>,
current_key_entries: Mutex<BlockAccumulator>,
key_id: u64,
val_writer: Mutex<Option<SegmentWriter>>,
val_handle: Option<Arc<Map>>,
current_val_entries: Mutex<BlockAccumulator>,
val_block_count: AtomicU64,
val_id: u64,
current_ns: AtomicU64,
bytes_written: AtomicU64,
}
impl Segment {
pub fn new(
key_id: u64,
val_id: u64,
seed: i64,
key_writer: SegmentWriter,
val_writer: SegmentWriter,
) -> Self {
let mut key_index = Index::new(key_id, seed);
key_index.insert_ns_offset(DEFAULT_NS);
Self {
key_writer: Mutex::new(Some(key_writer)),
key_handle: None,
val_writer: Mutex::new(Some(val_writer)),
val_handle: None,
key_index: Arc::new(parking_lot::RwLock::new(key_index)),
current_key_entries: Mutex::new((Vec::new(), Vec::new())),
current_val_entries: Mutex::new((Vec::new(), Vec::new())),
val_block_count: AtomicU64::new(0),
current_ns: AtomicU64::new(DEFAULT_NS),
key_id,
val_id,
bytes_written: AtomicU64::new(0),
}
}
pub fn open(
key_map: Arc<Map>,
key_index: Index,
key_id: u64,
val_map: Arc<Map>,
val_id: u64,
val_block_count: u64,
) -> Result<Arc<Segment>, SegmentError> {
let total_bytes = key_map.len() as u64 + val_map.len() as u64;
Ok(Arc::new(Segment {
key_writer: Mutex::new(None),
key_handle: Some(key_map),
val_writer: Mutex::new(None),
val_handle: Some(val_map),
key_index: Arc::new(parking_lot::RwLock::new(key_index)),
current_key_entries: Mutex::new((Vec::new(), Vec::new())),
current_val_entries: Mutex::new((Vec::new(), Vec::new())),
val_block_count: AtomicU64::new(val_block_count),
current_ns: AtomicU64::new(DEFAULT_NS),
key_id,
val_id,
bytes_written: AtomicU64::new(total_bytes),
}))
}
pub fn is_read_only(&self) -> bool {
self.key_writer.lock().is_none()
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn write(&self, key: &[u8], val: &[u8]) -> Result<(), SegmentError> {
use crate::errs::BlockError;
if self.key_writer.lock().is_none() {
return Err(ReadOnly);
}
let ns = u64::from_le_bytes(key[0..8].as_ref().try_into().unwrap());
if ns != self.current_ns.load(Relaxed) {
self.current_ns.store(ns, Relaxed);
self.key_index.write().insert_ns_offset(ns);
}
let (value_block_num, value_entry_index) = match self.add_entry_with_retry(val, &Value) {
| Ok(v) => v,
| Err(e) => return Err(e),
};
let mut key_with_metadata = Vec::with_capacity(VALUE_LOCATION_SIZE + key.len());
key_with_metadata.extend_from_slice(&value_block_num.to_le_bytes());
key_with_metadata.extend_from_slice(&value_entry_index.to_le_bytes());
key_with_metadata.extend_from_slice(key);
if let Err(e) = self.add_entry_with_retry(&key_with_metadata, &Key) {
return Err(e);
}
debug_assert!(
key.len() >= 24,
"Key too short: {} bytes. Keys must be serialized with KeyBytes::serialize()",
key.len()
);
Ok(())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn new_reader(&self) -> Result<SegmentReader, SegmentError> {
let km: Arc<Map> = match &self.key_handle {
| Some(handle) => handle.clone(),
| None => {
let writer = self.key_writer.lock();
match writer.as_ref() {
| Some(w) => w.map.clone(),
| None => return Err(CantCreateReader),
}
},
};
let vm: Arc<Map> = match &self.val_handle {
| Some(handle) => handle.clone(),
| None => {
let writer = self.val_writer.lock();
match writer.as_ref() {
| Some(w) => w.map.clone(),
| None => return Err(CantCreateReader),
}
},
};
SegmentReader::new(km, vm, self.key_index.clone())
}
fn add_entry_with_retry(
&self,
data: &[u8],
block_type: &BlockType,
) -> Result<(u64, u16), SegmentError> {
let (entries_mutex, block_counter) = match block_type {
| Key => (&self.current_key_entries, None),
| Value => (&self.current_val_entries, Some(&self.val_block_count)),
};
let mut accumulator = entries_mutex.lock();
let (ref mut offsets, ref mut entries) = *accumulator;
let entry_size = data.len() + size_of::<u8>();
let space_needed = entry_size + size_of::<u16>();
let current_used: usize = size_of::<u16>() + offsets.len() * size_of::<u16>() + entries.len();
if entry_size > MAX_ENTRY_SIZE {
drop(accumulator);
return self.split_across_blocks(data, block_type);
}
if current_used + space_needed <= BLOCK_SIZE {
let entry_idx = offsets.len() as u16;
let current_offset = if offsets.is_empty() {
0
} else {
offsets[offsets.len() - 1]
};
let next_offset = current_offset + (entry_size as u16);
offsets.push(next_offset);
entries.push(Complete as u8);
entries.extend_from_slice(data);
drop(accumulator);
let block_num = match block_counter {
| Some(counter) => counter.load(Relaxed),
| None => 0,
};
Ok((block_num, entry_idx))
} else {
drop(accumulator);
if let Err(e) = self.write_block(block_type) {
return Err(e);
}
let mut accumulator = entries_mutex.lock();
let (ref mut offsets, ref mut entries) = *accumulator;
let entry_idx = offsets.len() as u16;
let entry_size_u16 = entry_size as u16;
offsets.push(entry_size_u16);
entries.push(Complete as u8);
entries.extend_from_slice(data);
drop(accumulator);
let block_num = match block_counter {
| Some(counter) => counter.load(Relaxed),
| None => 0,
};
Ok((block_num, entry_idx))
}
}
fn write_chunk_to_new_block(
&self,
chunk: &[u8],
flag: EntryFlag,
block_type: &BlockType,
) -> Result<(), SegmentError> {
use crate::block::BlockBuilder;
match block_type {
| Key => {
let mut writer_guard = self.key_writer.lock();
let writer = match writer_guard.as_mut().ok_or(ReadOnly) {
| Ok(w) => w,
| Err(e) => return Err(e),
};
if let Err(e) = writer.write_block_direct(|mmap_slice| {
let mut builder = BlockBuilder::new(mmap_slice);
let _ = builder.add_entry(chunk, flag);
builder.finalize();
}) {
return Err(e);
}
drop(writer_guard);
self.key_index.write().inc_block_count(1);
},
| Value => {
let mut writer_guard = self.val_writer.lock();
let writer = match writer_guard.as_mut().ok_or(ReadOnly) {
| Ok(w) => w,
| Err(e) => return Err(e),
};
if let Err(e) = writer.write_block_direct(|mmap_slice| {
let mut builder = BlockBuilder::new(mmap_slice);
let _ = builder.add_entry(chunk, flag);
builder.finalize();
}) {
return Err(e);
}
drop(writer_guard);
self.val_block_count.fetch_add(1, Relaxed);
},
}
Ok(())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn flush(&self) -> Result<(), SegmentError> {
if self.key_writer.lock().is_none() {
return Err(ReadOnly);
}
{
let key_entries = self.current_key_entries.lock();
if !key_entries.0.is_empty() {
drop(key_entries);
match self.write_block(&Key) {
| Ok(_) => {},
| Err(e) => return Err(e),
};
}
}
{
let val_entries = self.current_val_entries.lock();
if !val_entries.0.is_empty() {
drop(val_entries);
match self.write_block(&Value) {
| Ok(_) => {},
| Err(e) => return Err(e),
};
}
}
Ok(())
}
fn split_across_blocks(
&self,
data: &[u8],
r#type: &BlockType,
) -> Result<(u64, u16), SegmentError> {
if data.is_empty() {
return Err(SegmentError::InsufficientSpace);
}
let mut remaining = data;
let max_chunk_size = MAX_ENTRY_SIZE - 1;
let start_entry_index: u16 = 0;
{
let entries = match r#type {
| Key => self.current_key_entries.lock(),
| Value => self.current_val_entries.lock(),
};
if !entries.0.is_empty() {
drop(entries);
if let Err(e) = self.write_block(r#type) {
return Err(e);
}
}
}
let start_block_num = match r#type {
| Key => self.key_index.write().block_count(),
| Value => self.val_block_count.load(Relaxed),
};
let chunk_size = std::cmp::min(max_chunk_size, remaining.len());
let chunk = &remaining[..chunk_size];
if matches!(r#type, Key) {
debug_assert!(
data.len() > 26,
"Multi-block key too short: {} bytes",
data.len()
);
}
if let Err(e) = self.write_chunk_to_new_block(chunk, Start, r#type) {
return Err(e);
}
remaining = &remaining[chunk_size..];
while remaining.len() > max_chunk_size {
let chunk = &remaining[..max_chunk_size];
if let Err(e) = self.write_chunk_to_new_block(chunk, Middle, r#type) {
return Err(e);
}
remaining = &remaining[max_chunk_size..];
}
if !remaining.is_empty() {
if let Err(e) = self.write_chunk_to_new_block(remaining, End, r#type) {
return Err(e);
}
}
Ok((start_block_num, start_entry_index))
}
fn write_block(&self, r#type: &BlockType) -> Result<(), SegmentError> {
use crate::block::BlockBuilder;
match r#type {
| Key => {
let (offsets, entries) = {
let mut key_acc = self.current_key_entries.lock();
if key_acc.0.is_empty() {
return Ok(()); }
mem::replace(&mut *key_acc, (Vec::new(), Vec::new()))
};
let result = {
let mut writer_guard = self.key_writer.lock();
match writer_guard.as_mut() {
| Some(writer) => {
let res = writer.write_block_direct(|mmap_slice| {
let builder =
BlockBuilder::from_parts(mmap_slice, offsets, entries);
builder.finalize();
});
if res.is_ok() {
drop(writer_guard);
self.key_index.write().inc_block_count(1);
self.bytes_written.fetch_add(BLOCK_SIZE as u64, Relaxed);
}
res
},
| None => return Err(ReadOnly),
}
};
result
},
| Value => {
let (offsets, entries) = {
let mut val_acc = self.current_val_entries.lock();
if val_acc.0.is_empty() {
return Ok(()); }
mem::replace(&mut *val_acc, (Vec::new(), Vec::new()))
};
let result = {
let mut writer_guard = self.val_writer.lock();
match writer_guard.as_mut() {
| Some(writer) => {
let res = writer.write_block_direct(|mmap_slice| {
let builder =
BlockBuilder::from_parts(mmap_slice, offsets, entries);
builder.finalize();
});
if res.is_ok() {
drop(writer_guard);
self.val_block_count.fetch_add(1, Relaxed);
self.bytes_written.fetch_add(BLOCK_SIZE as u64, Relaxed);
}
res
},
| None => return Err(ReadOnly),
}
};
result
},
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn close(&self) -> Result<(), SegmentError> {
if self.key_writer.lock().is_none() {
return Err(ReadOnly);
}
match self.flush() {
| Ok(_) => {},
| Err(e) => return Err(e),
}
if let Err(e) = self.rebuild_key_index() {
return Err(e);
}
{
let mut writer_guard = self.key_writer.lock();
if let Some(writer) = writer_guard.as_mut() {
let block_count = writer.block_count();
let key_index = self.key_index.write();
let index_size = key_index.size();
let index_start = match writer.write_index(&*key_index) {
| Ok(v) => v,
| Err(e) => return Err(e),
};
drop(key_index);
match writer.write_metadata(Metadata::new(
self.key_id,
block_count,
index_size as u64,
index_start,
)) {
| Ok(_) => {},
| Err(e) => return Err(e),
};
match writer.close() {
| Ok(_) => {},
| Err(e) => return Err(e),
};
}
}
{
let mut writer_guard = self.val_writer.lock();
if let Some(writer) = writer_guard.as_mut() {
let block_count = writer.block_count();
writer.begin_close();
match writer.write_metadata(Metadata::new(
self.val_id,
block_count,
0, 0, )) {
| Ok(_) => {},
| Err(e) => return Err(e),
};
match writer.close() {
| Ok(_) => {},
| Err(e) => return Err(e),
};
}
}
*self.key_writer.lock() = None;
*self.val_writer.lock() = None;
Ok(())
}
pub fn rebuild_key_index(&self) -> Result<(), SegmentError> {
use crate::utils::Deserializer;
let key_map = match &self.key_handle {
| Some(handle) => handle.clone(),
| None => {
let writer = self.key_writer.lock();
match writer.as_ref() {
| Some(w) => w.map.clone(),
| None => return Err(CantCreateReader),
}
},
};
let block_count = self.key_index.read().block_count() as usize;
let mut key_block_pairs = Vec::with_capacity(block_count * 100);
for block_idx in 0..block_count {
let block_offset = block_idx * BLOCK_SIZE;
let block_bytes = match key_map
.read_range(block_offset..block_offset + BLOCK_SIZE, |slice| {
bytes::Bytes::copy_from_slice(slice)
}) {
| Ok(b) => b,
| Err(e) => return Err(e),
};
let block = crate::block::ReadOnlyBlock::deserialize(block_bytes);
for entry_idx in 0..block.num_entries() as usize {
if let Some((_, entry_bytes)) = block.get(entry_idx) {
if entry_bytes.len() > 26 {
let key_without_ts = entry_bytes[10..entry_bytes.len() - 16].to_vec();
key_block_pairs.push((key_without_ts, block_idx as u64));
}
}
}
}
let mut key_index = self.key_index.write();
key_index.rebuild_bloom_from_keys(key_block_pairs.iter().map(|(k, b)| (k.as_slice(), *b)));
drop(key_index);
Ok(())
}
#[inline]
pub fn id(&self) -> u64 {
self.key_id
}
pub fn size_in_bytes(&self) -> u64 {
let key_size = if let Some(ref handle) = self.key_handle {
handle.len() as u64
} else {
0
};
let val_size = if let Some(ref handle) = self.val_handle {
handle.len() as u64
} else {
0
};
let handle_size = key_size + val_size;
if handle_size > 0 {
handle_size
} else {
self.bytes_written.load(Relaxed)
}
}
pub fn reader(&self) -> Result<crate::segment_reader::SegmentReader, SegmentError> {
if !self.is_read_only() {
return Err(SegmentError::ReadOnly);
}
let key_handle = match self.key_handle.as_ref().ok_or(SegmentError::ReadOnly) {
| Ok(h) => h.clone(),
| Err(e) => return Err(e),
};
let val_handle = match self.val_handle.as_ref().ok_or(SegmentError::ReadOnly) {
| Ok(h) => h.clone(),
| Err(e) => return Err(e),
};
let key_index = self.key_index.clone();
crate::segment_reader::SegmentReader::new(key_handle, val_handle, key_index)
}
}
impl Drop for Segment {
fn drop(&mut self) {
let res = self.close();
if let Err(_e) = res &&
!matches!(_e, ReadOnly)
{
}
}
}
#[cfg(test)]
#[allow(clippy::question_mark_used)]
#[allow(clippy::missing_safety_doc)]
#[allow(clippy::undocumented_unsafe_blocks)]
mod tests {
use std::{
collections::HashMap,
sync::Arc,
};
use bytes::Bytes;
use rand::{
Rng,
RngCore,
prelude::SliceRandom,
rng,
};
use tempfile::tempdir;
use super::*;
use crate::{
block::Block,
hlc::{
HLC,
HybridLogicalClock,
},
keypair::{
DEFAULT_NS,
KeyBytes,
ValueBytes,
},
map::Map,
memtable::Memtable,
segment_reader::SegmentReader,
segment_writer::SegmentWriter,
};
fn create_test_segment() -> (Arc<Segment>, tempfile::TempDir) {
let dir = tempdir().expect("failed to create temp dir");
let random_id: u64 = rand::random();
let key_path = dir.path().join(format!("test-key-segment-{}", random_id));
let key_map = Arc::new(Map::new(key_path, 4096 * 10).expect("failed to create key map"));
let key_writer = SegmentWriter::new(key_map.clone()).expect("failed to create key writer");
let val_path = dir.path().join(format!("test-val-segment-{}", random_id));
let val_map = Arc::new(Map::new(val_path, 4096 * 10).expect("failed to create val map"));
let val_writer = SegmentWriter::new(val_map.clone()).expect("failed to create val writer");
let seed = 42i64; let segment = Arc::new(Segment::new(1, 2, seed, key_writer, val_writer));
(segment, dir)
}
fn create_kv(key: &str, value: &str, clock: &HybridLogicalClock) -> (KeyBytes, ValueBytes) {
(
KeyBytes::new(DEFAULT_NS, Bytes::from(key.to_string()), clock.time()),
ValueBytes::new(DEFAULT_NS, Bytes::from(value.to_string())),
)
}
fn create_serialized_kv(key: &str, value: &str, clock: &HybridLogicalClock) -> (Bytes, Bytes) {
use crate::utils::Serializer;
let (k, v) = create_kv(key, value, clock);
(k.serialize(), v.serialize())
}
#[test]
fn test_segment_basic_write() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
let (key, val) = create_serialized_kv("abc", "123", &clock);
let result = segment.write(&key, &val);
assert!(result.is_ok(), "Failed to write to segment: {:?}", result);
}
#[test]
fn test_segment_multiple_writes() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
for i in 0u32..10 {
let key = format!("key_{}", i);
let value = format!("value_{}", i * 10);
let (k, v) = create_serialized_kv(&key, &value, &clock);
let result = segment.write(&k, &v);
assert!(result.is_ok(), "Failed to write entry {}: {:?}", i, result);
}
}
#[test]
fn test_segment_namespace_handling() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
for ns in &[1u64, 2u64, 1u64, 3u64, 2u64] {
use crate::utils::Serializer;
let key = KeyBytes::new(*ns, Bytes::from("testkey"), clock.time());
let val = ValueBytes::new(*ns, Bytes::from("testvalue"));
let result = segment.write(key.serialize().as_ref(), val.serialize().as_ref());
assert!(
result.is_ok(),
"Failed to write entry for ns {}: {:?}",
ns,
result
);
}
assert!(
segment.key_index.write().ns_offset_count() >= 3,
"Key index should track at least 3 namespace changes"
);
}
#[test]
fn test_segment_large_entry() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
let large_data_size = 8192; let mut large_data = vec![0u8; large_data_size];
rng().fill_bytes(&mut large_data);
use crate::utils::Serializer;
let key = KeyBytes::new(DEFAULT_NS, Bytes::from("large_entry_key"), clock.time());
let val = ValueBytes::new(DEFAULT_NS, Bytes::from(large_data));
let result = segment.write(key.serialize().as_ref(), val.serialize().as_ref());
assert!(result.is_ok(), "Failed to write large entry: {:?}", result);
}
#[test]
fn test_segment_mixed_entry_sizes() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
let mut rng = rng();
for i in 0u32..20 {
let size = match i % 4 {
| 0 => 10, // Small
| 1 => 1000, | 2 => 4000, | 3 => 6000, | _ => unreachable!(),
};
let mut data = vec![0u8; size];
rng.fill_bytes(&mut data);
use crate::utils::Serializer;
let key = KeyBytes::new(DEFAULT_NS, Bytes::from(format!("key_{}", i)), clock.time());
let val = ValueBytes::new(DEFAULT_NS, Bytes::from(data));
let result = segment.write(key.serialize().as_ref(), val.serialize().as_ref());
assert!(
result.is_ok(),
"Failed to write entry with size {}: {:?}",
size,
result
);
}
}
#[test]
fn test_segment_reader_creation() {
let (segment, _dir) = create_test_segment();
let reader = segment.new_reader();
assert!(reader.is_ok());
}
#[test]
fn test_segment_with_many_small_entries() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
for i in 0u32..1000 {
let (key, val) =
create_serialized_kv(&format!("key_{}", i), &format!("Value{}", i), &clock);
let result = segment.write(&key, &val);
assert!(
result.is_ok(),
"Failed to write small entry {}: {:?}",
i,
result
);
}
assert!(
segment.key_writer.lock().as_ref().unwrap().block_count() > 0,
"Should have created some key blocks"
);
assert!(
segment.val_writer.lock().as_ref().unwrap().block_count() > 0,
"Should have created some value blocks"
);
}
#[test]
fn test_segment_sequential_keys() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
let mut data = vec![];
for c in 'a'..='z' {
let key = format!("key_{}", c);
let value = format!("value_{}", c);
let (k, v) = create_serialized_kv(&key, &value, &clock);
data.push((k, v));
}
for (key, val) in data {
let result = segment.write(&key, &val);
assert!(
result.is_ok(),
"Failed to write sequential key: {:?}",
result
);
}
}
#[test]
fn test_segment_random_order_keys() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
let mut data = vec![];
for i in 0..100 {
let key = format!("random_key_{:03}", i);
let value = format!("random_value_{:03}", i);
let (k, v) = create_serialized_kv(&key, &value, &clock);
data.push((k, v));
}
let mut rng = rand::rng();
data.shuffle(&mut rng);
for (key, val) in data {
let result = segment.write(&key, &val);
assert!(
result.is_ok(),
"Failed to write random-order key: {:?}",
result
);
}
}
#[test]
fn test_segment_index_building() {
let (mut segment, _dir) = create_test_segment();
let segment = Arc::get_mut(&mut segment).unwrap();
let clock = HybridLogicalClock::new();
let _initial_key_blocks = segment.key_index.write().block_count();
for i in 0u32..512 {
let (key, val) =
create_serialized_kv(&format!("key_{}", i), &format!("value_{}", i * 10), &clock);
let result = segment.write(&key, &val);
assert!(result.is_ok(), "Failed to write entry {}: {:?}", i, result);
}
segment.flush().expect("failed to flush segment");
let final_key_blocks = segment.key_index.write().block_count();
assert!(
final_key_blocks > 0,
"there should be at least 1 block in the key index for 512 entries, found: {}",
final_key_blocks
);
}
#[test]
fn test_val_block_count_persistence() {
use crate::segment_builder::SegmentBuilder;
let dir = tempdir().expect("failed to create temp dir");
let builder = SegmentBuilder::new(dir.path().to_path_buf())
.expect("failed to create segment builder");
let segment_id = 100;
let seed = 42;
let segment = builder
.new_segment(segment_id, seed, DEFAULT_SEGMENT_SIZE)
.expect("failed to create segment");
let num_entries: u32 = 20;
let value_size = 1024;
for i in 0..num_entries {
let mut key = vec![0u8; 8]; key.extend_from_slice(&i.to_le_bytes());
key.extend_from_slice(&[0u8; 16]);
let val = vec![i as u8; value_size];
segment.write(&key, &val).expect("failed to write entry");
}
let before_close_count = segment
.val_block_count
.load(std::sync::atomic::Ordering::Relaxed);
assert!(
before_close_count > 0,
"Expected at least one value block to be written"
);
segment.close().expect("failed to close segment");
let after_close_count = segment
.val_block_count
.load(std::sync::atomic::Ordering::Relaxed);
drop(segment);
let reopened = builder.open(segment_id).expect("failed to reopen segment");
let restored_block_count = reopened
.val_block_count
.load(std::sync::atomic::Ordering::Relaxed);
assert_eq!(
restored_block_count, after_close_count,
"val_block_count should be restored to {} after reopening, but got {}",
after_close_count, restored_block_count
);
}
}