use std::{
ops::{
Bound,
DerefMut,
},
sync::Arc,
};
use bytes::{
Buf,
Bytes,
BytesMut,
};
use crossbeam_queue::ArrayQueue;
use tracing::instrument;
use crate::{
block::{
BLOCK_SIZE,
Block,
EntryFlag,
},
errs::{
SegmentError,
SegmentError::{
CorruptedBlock,
InvalidSize,
MissingKey,
ReadOutOfBounds,
},
},
index::Index,
keypair::{
KeyBytes,
ValueBytes,
},
map::Map,
segment::{
BlockType,
BlockType::{
Key,
Value,
},
DEFAULT_SEGMENT_SIZE,
Metadata,
},
segment_iterator::{
RawSegmentScanIterator,
SeekingBlockIterator,
SegmentBlockIterator,
SegmentScanIterator,
convert_bound_to_bytes,
},
utils::Deserializer,
};
#[derive(Debug, Clone)]
pub(crate) struct ReadConfig {
read_ahead: usize,
}
impl Default for ReadConfig {
fn default() -> Self {
Self { read_ahead: 4 }
}
}
#[derive(Debug)]
struct BlockCache<const N: usize> {
entries: [(usize, Option<crate::block::ReadOnlyBlock>); N],
next: usize,
}
impl<const N: usize> BlockCache<N> {
fn new() -> Self {
Self {
entries: core::array::from_fn(|_| (0, None)),
next: 0,
}
}
#[inline]
fn get(&self, block_index: usize) -> Option<&crate::block::ReadOnlyBlock> {
for (idx, block) in &self.entries {
if *idx == block_index {
return block.as_ref();
}
}
None
}
#[inline]
fn insert(&mut self, block_index: usize, block: crate::block::ReadOnlyBlock) {
self.entries[self.next] = (block_index, Some(block));
self.next = (self.next + 1) % N;
}
}
#[derive(Debug)]
pub struct SegmentReader {
key_handle: Arc<Map>,
val_handle: Arc<Map>,
pub(crate) key_index: Arc<parking_lot::RwLock<Index>>,
pub(crate) visible_key_blocks: usize,
visible_val_blocks: usize,
pub(crate) num_blocks: usize,
value_block_cache: parking_lot::Mutex<BlockCache<8>>,
}
impl SegmentReader {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn new(
key_handle: Arc<Map>,
val_handle: Arc<Map>,
key_index: Arc<parking_lot::RwLock<Index>>,
) -> Result<Self, SegmentError> {
Self::with_config(key_handle, val_handle, key_index, ReadConfig::default())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn with_config(
key_handle: Arc<Map>,
val_handle: Arc<Map>,
key_index: Arc<parking_lot::RwLock<Index>>,
config: ReadConfig,
) -> Result<Self, SegmentError> {
let segment_size = key_handle.len();
let metadata_block_count = if segment_size >= 32 {
match key_handle.read_range(segment_size - 32..segment_size, |slice| {
Metadata::from(Bytes::copy_from_slice(slice))
}) {
| Ok(m) => {
let index_end = m.index_start() + m.index_size();
if index_end + 32 == segment_size {
Some(m.block_count())
} else {
None
}
},
| Err(_) => None,
}
} else {
None
};
let index_blocks = key_index.read().num_blocks() as usize;
let num_blocks = segment_size.div_ceil(BLOCK_SIZE);
let visible_key_blocks = if let Some(block_count) = metadata_block_count {
block_count as usize
} else if index_blocks > 0 {
index_blocks
} else if segment_size >= DEFAULT_SEGMENT_SIZE as usize {
0
} else {
num_blocks
};
let visible_val_blocks = val_handle.len() / BLOCK_SIZE;
Ok(Self {
key_handle,
val_handle,
key_index,
visible_key_blocks,
visible_val_blocks,
num_blocks,
value_block_cache: parking_lot::Mutex::new(BlockCache::new()),
})
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>, SegmentError> {
debug_assert!(
key.len() >= 24,
"Key too short: {} bytes. Keys must be serialized with KeyBytes::serialize()",
key.len()
);
let key_without_ts = &key[..key.len() - 16];
if !self.key_index.read().may_contain(key_without_ts) {
return Ok(None);
}
let key_block_offset = match self.key_index.read().get_block(key_without_ts) {
| Some(v) => v,
| None => {
return Ok(None);
},
};
let key_block = match self.read_key_block(key_block_offset as usize) {
| Ok(v) => v,
| Err(e) => {
return Err(MissingKey);
},
};
for entry_index in 0..key_block.num_entries() as usize {
let (flag, data) = match key_block.get(entry_index) {
| Some(v) => v,
| None => continue,
};
if data.len() < 10 {
continue; }
let value_block_num = u64::from_le_bytes(data[0..8].try_into().unwrap());
let value_entry_index = u16::from_le_bytes(data[8..10].try_into().unwrap());
let actual_key_data = &data[10..];
let key_matches = match flag {
| EntryFlag::Complete => {
actual_key_data == key
},
| EntryFlag::Start => {
match self.read_key(key_block_offset as usize, entry_index) {
| Ok(full_key_data) => {
if full_key_data.len() < 10 {
continue;
}
let full_actual_key = &full_key_data[10..];
full_actual_key == key
},
| Err(e) => {
continue;
},
}
},
| _ => continue, // Skip middle or end entries
};
if key_matches {
return match self.read_value(value_block_num as usize, value_entry_index as usize) {
| Ok(v) => Ok(Some(v)),
| Err(e) => Err(e),
};
}
}
Ok(None)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn read_key_block(
&self,
block_index: usize,
) -> Result<crate::block::ReadOnlyBlock, SegmentError> {
if block_index >= self.visible_key_blocks {
return Err(ReadOutOfBounds);
}
let block = match self.read_block_at(block_index, Key) {
| Ok(v) => v,
| Err(e) => return Err(e),
};
Ok(block)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn read_multiblock_entry(
&self,
flag: EntryFlag,
initial_data: &[u8],
starting_block: usize,
) -> Result<Bytes, SegmentError> {
use EntryFlag::*;
match flag {
| Complete => Ok(Bytes::copy_from_slice(initial_data)),
| Start => {
let mut buffer = BytesMut::with_capacity(initial_data.len() * 2);
buffer.extend_from_slice(initial_data);
let mut current_block_index = starting_block;
let mut found_end = false;
while current_block_index < self.visible_key_blocks && !found_end {
let next_block = match self.read_key_block(current_block_index) {
| Ok(b) => b,
| Err(e) => return Err(e),
};
if next_block.num_entries() == 0 {
current_block_index += 1;
continue;
}
let (next_flag, next_data) = match next_block.get(0).ok_or(CorruptedBlock) {
| Ok(v) => v,
| Err(e) => return Err(e),
};
match next_flag {
| Middle => {
buffer.extend_from_slice(next_data);
current_block_index += 1;
},
| End => {
buffer.extend_from_slice(next_data);
found_end = true;
},
| _ => return Err(CorruptedBlock),
}
}
if !found_end {
return Err(CorruptedBlock);
}
Ok(buffer.freeze())
},
| Middle | End => Err(CorruptedBlock),
}
}
fn read_key(&self, key_block_index: usize, entry_index: usize) -> Result<Bytes, SegmentError> {
let block = match self.read_key_block(key_block_index) {
| Ok(b) => b,
| Err(e) => return Err(e),
};
let (flag, data) = match block.get(entry_index).ok_or(MissingKey) {
| Ok(v) => v,
| Err(e) => return Err(e),
};
self.read_multiblock_entry(flag, data, key_block_index + 1)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn read_value(
&self,
val_block_index: usize,
entry_index: usize,
) -> Result<Bytes, SegmentError> {
if val_block_index >= self.visible_val_blocks {
return Err(ReadOutOfBounds);
}
let block = {
let cache = self.value_block_cache.lock();
if let Some(cached_block) = cache.get(val_block_index) {
cached_block.clone()
} else {
drop(cache);
let block = match self.read_block_at(val_block_index, Value) {
| Ok(v) => v,
| Err(e) => {
return Err(e);
},
};
self.value_block_cache
.lock()
.insert(val_block_index, block.clone());
block
}
};
if entry_index >= block.num_entries() as usize {
return Err(MissingKey);
}
let (flag, data) = match block.get(entry_index) {
| Some(v) => v,
| None => {
return Err(MissingKey);
},
};
match flag {
| EntryFlag::Complete => {
Ok(Bytes::copy_from_slice(data))
},
| EntryFlag::Start => {
let mut buffer = BytesMut::with_capacity(data.len() * 2);
buffer.extend_from_slice(data);
let mut current_block_index = val_block_index + 1;
let mut found_end = false;
if current_block_index >= self.visible_val_blocks {
return Err(CorruptedBlock);
}
while current_block_index < self.visible_val_blocks && !found_end {
let next_block = match self.read_block_at(current_block_index, Value) {
| Ok(v) => v,
| Err(e) => {
return Err(e);
},
};
if next_block.num_entries() == 0 {
current_block_index += 1;
continue;
}
let (next_flag, next_data) = match next_block.get(0) {
| Some(v) => v,
| None => {
return Err(CorruptedBlock);
},
};
match next_flag {
| EntryFlag::Middle => {
buffer.extend_from_slice(next_data);
current_block_index += 1;
},
| EntryFlag::End => {
buffer.extend_from_slice(next_data);
found_end = true;
},
| _ => {
return Err(CorruptedBlock);
},
}
}
if !found_end {
return Err(CorruptedBlock);
}
Ok(buffer.freeze())
},
| EntryFlag::Middle | EntryFlag::End => Err(CorruptedBlock),
}
}
fn find_key(&self, key_hash: u64, key_block_offset: u64) -> Result<Bytes, SegmentError> {
let block_index = key_block_offset as usize;
let block = match self.read_key_block(block_index) {
| Ok(b) => b,
| Err(e) => return Err(e),
};
if let Some((flag, data)) = block.get(0) {
return self.read_multiblock_entry(flag, data, block_index + 1);
}
Err(ReadOutOfBounds)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn visible_blocks(&self) -> (usize, usize) {
(self.visible_key_blocks, self.visible_val_blocks)
}
pub(crate) fn key_handle(&self) -> &Arc<Map> {
&self.key_handle
}
pub(crate) fn val_handle(&self) -> &Arc<Map> {
&self.val_handle
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn iter<'a>(&'a mut self) -> SegmentBlockIterator<'a> {
SegmentBlockIterator::new(self)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn seeking_iter<'a>(&'a mut self) -> SeekingBlockIterator<'a> {
SeekingBlockIterator::new(self, 0, self.num_blocks)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
fn read_block_at(
&self,
block_index: usize,
block_type: BlockType,
) -> Result<crate::block::ReadOnlyBlock, SegmentError> {
let offset = block_index * BLOCK_SIZE;
let bytes = match block_type {
| Key => {
if offset + BLOCK_SIZE > self.key_handle.len() {
return Err(ReadOutOfBounds);
}
match self
.key_handle
.read_range(offset..offset + BLOCK_SIZE, |slice| {
Bytes::copy_from_slice(slice)
}) {
| Ok(b) => b,
| Err(e) => return Err(e),
}
},
| Value => {
if offset + BLOCK_SIZE > self.val_handle.len() {
return Err(ReadOutOfBounds);
}
match self
.val_handle
.read_range(offset..offset + BLOCK_SIZE, |slice| {
Bytes::copy_from_slice(slice)
}) {
| Ok(b) => b,
| Err(e) => return Err(e),
}
},
};
let block = crate::block::ReadOnlyBlock::deserialize(bytes);
Ok(block)
}
pub fn may_contain(&self, key_without_timestamp: &[u8]) -> bool {
self.key_index.read().may_contain(key_without_timestamp)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn scan(self, lower_bound: Bound<&[u8]>, upper_bound: Bound<&[u8]>) -> SegmentScanIterator {
let start_block = match lower_bound {
| Bound::Included(key) | Bound::Excluded(key) => {
debug_assert!(
key.len() >= 24,
"Scan key too short: {} bytes. Keys must be serialized",
key.len()
);
let key_without_ts = &key[..key.len() - 16];
match self.key_index.read().get_block(key_without_ts) {
| Some(block_offset) => block_offset as usize,
| None => 0, }
},
| Bound::Unbounded => 0, // Start from the beginning
};
SegmentScanIterator::new(self, (lower_bound, upper_bound), start_block)
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn scan_raw(
self,
lower_bound: Bound<&[u8]>,
upper_bound: Bound<&[u8]>,
) -> RawSegmentScanIterator {
let start_block = match lower_bound {
| Bound::Included(key) | Bound::Excluded(key) => {
debug_assert!(
key.len() >= 24,
"Scan key too short: {} bytes. Keys must be serialized",
key.len()
);
let key_without_ts = &key[..key.len() - 16];
match self.key_index.read().get_block(key_without_ts) {
| Some(block_offset) => block_offset as usize,
| None => 0,
}
},
| Bound::Unbounded => 0,
};
RawSegmentScanIterator::new(self, (lower_bound, upper_bound), start_block)
}
#[inline]
pub(crate) fn num_blocks(&self) -> usize {
self.num_blocks
}
}
#[cfg(test)]
#[allow(clippy::question_mark_used)]
#[allow(clippy::missing_safety_doc)]
#[allow(clippy::undocumented_unsafe_blocks)]
mod tests {
use std::sync::Arc;
use tempfile::tempdir;
use super::*;
use crate::{
block::{
Block,
EntryFlag,
MAX_ENTRY_SIZE,
},
map::Map,
};
fn create_test_key(user_key: &[u8]) -> Vec<u8> {
let mut key = vec![0u8; 8]; key.extend_from_slice(user_key); key.extend_from_slice(&[0u8; 16]); key
}
fn create_test_map(size: usize) -> (tempfile::TempDir, Arc<Map>) {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.map");
let map = Arc::new(Map::new(file_path, size as u64).unwrap());
(dir, map)
}
fn prepare_blocks_map(num_blocks: usize) -> (tempfile::TempDir, Arc<Map>) {
let (dir, map) = create_test_map(num_blocks * BLOCK_SIZE);
for i in 0..num_blocks {
let mut block = Block::new();
let data = vec![i as u8; 16]; block.add_entry(&data, EntryFlag::Complete).unwrap();
let offset = i * BLOCK_SIZE;
let block_range = offset..(offset + BLOCK_SIZE);
map.write_to_range(block_range, |slice| unsafe {
block.finalize(slice.as_mut_ptr());
})
.unwrap();
}
(dir, map)
}
fn prepare_test_segment_for_get() -> (tempfile::TempDir, Arc<Map>, Arc<Map>, Index, Index) {
let dir = tempdir().unwrap();
let key_path = dir.path().join("key_segment");
let key_map = Arc::new(Map::new(key_path, BLOCK_SIZE as u64 * 10).unwrap());
let val_path = dir.path().join("val_segment");
let val_map = Arc::new(Map::new(val_path, BLOCK_SIZE as u64 * 10).unwrap());
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
(dir, key_map, val_map, key_index, val_index)
}
fn write_block_to_mapfor_get(map: &Arc<Map>, offset: usize, block: &Block) {
let range = offset..(offset + BLOCK_SIZE);
map.write_to_range(range, |slice| unsafe {
block.finalize(slice.as_mut_ptr());
})
.unwrap();
}
fn add_key_metadata(key: &[u8], value_block: u64, value_entry: u16) -> Vec<u8> {
let mut result = Vec::with_capacity(10 + key.len());
result.extend_from_slice(&value_block.to_le_bytes());
result.extend_from_slice(&value_entry.to_le_bytes());
result.extend_from_slice(key);
result
}
#[test]
fn test_new_segment_reader() {
let size = BLOCK_SIZE * 4;
let (_dir, key_map) = create_test_map(size);
let (_dir2, val_map) = create_test_map(size);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
);
assert!(reader.is_ok());
let reader = reader.unwrap();
assert_eq!(reader.num_blocks(), 4);
}
#[test]
fn test_non_aligned_size_accepted() {
let non_aligned_size = BLOCK_SIZE * 2 + 100; let (_dir, key_map) = create_test_map(non_aligned_size);
let (_dir2, val_map) = create_test_map(non_aligned_size);
let key_index = Index::new(1, 1234);
let result = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
);
assert!(
result.is_ok(),
"SegmentReader should accept non-aligned sizes"
);
let reader = result.unwrap();
assert_eq!(reader.num_blocks, 3);
}
#[test]
fn test_read_key_block() {
let (_, key_map) = prepare_blocks_map(4);
let (_, val_map) = prepare_blocks_map(4);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
for i in 0..4 {
let block = reader.read_key_block(i).unwrap();
let entry = block.get(0).unwrap();
assert_eq!(entry.1, &vec![i as u8; 16]);
}
}
#[test]
fn test_read_block_out_of_bounds() {
let (_, key_map) = prepare_blocks_map(2);
let (_, val_map) = prepare_blocks_map(2);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let result = reader.read_key_block(2); assert!(result.is_err());
assert!(matches!(result.err().unwrap(), ReadOutOfBounds));
}
#[test]
fn test_read_block_caching() {
let (_, key_map) = prepare_blocks_map(5);
let (_, val_map) = prepare_blocks_map(5);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let block0 = reader.read_key_block(0).unwrap();
assert_eq!(block0.get(0).unwrap().1, &vec![0u8; 16]);
let block1 = reader.read_key_block(1).unwrap();
assert_eq!(block1.get(0).unwrap().1, &vec![1u8; 16]);
let block3 = reader.read_key_block(3).unwrap();
assert_eq!(block3.get(0).unwrap().1, &vec![3u8; 16]);
let block4 = reader.read_key_block(4).unwrap();
assert_eq!(block4.get(0).unwrap().1, &vec![4u8; 16]);
}
#[test]
fn test_read_block_random_access() {
let (_, key_map) = prepare_blocks_map(8);
let (_, val_map) = prepare_blocks_map(8);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let indices = [3, 1, 5, 0, 7, 2];
for &idx in &indices {
let block = reader.read_key_block(idx).unwrap();
assert_eq!(block.get(0).unwrap().1, &vec![idx as u8; 16]);
}
}
#[test]
fn test_segment_block_iterator() {
let (_, key_map) = prepare_blocks_map(3);
let (_, val_map) = prepare_blocks_map(3);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let mut reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let blocks: Vec<crate::block::ReadOnlyBlock> =
reader.iter().map(|result| result.unwrap()).collect();
assert_eq!(blocks.len(), 3);
for (i, block) in blocks.iter().enumerate() {
assert_eq!(block.get(0).unwrap().1, &vec![i as u8; 16]);
}
}
#[test]
fn test_seeking_block_iterator() {
let (_, key_map) = prepare_blocks_map(5);
let (_, val_map) = prepare_blocks_map(5);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let mut reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let mut iter = reader.seeking_iter();
assert_eq!(iter.current_position(), 0);
let block0 = iter.next().unwrap().unwrap();
assert_eq!(block0.get(0).unwrap().1, &vec![0u8; 16]);
iter.seek(3).unwrap();
assert_eq!(iter.current_position(), 3);
let block3 = iter.next().unwrap().unwrap();
assert_eq!(block3.get(0).unwrap().1, &vec![3u8; 16]);
let result = iter.seek(5);
assert!(result.is_err());
assert!(matches!(result.err().unwrap(), ReadOutOfBounds));
}
#[test]
fn test_iterator_size_hint() {
let (_, key_map) = prepare_blocks_map(5);
let (_, val_map) = prepare_blocks_map(5);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let mut reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let iter = reader.iter();
let (min, max) = iter.size_hint();
assert_eq!(min, 5);
assert_eq!(max, Some(5));
let key_index2 = Index::new(1, 1234);
let val_index2 = Index::new(1, 1234);
let mut reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index2)),
)
.unwrap();
let mut seeking_iter = reader.seeking_iter();
seeking_iter.seek(2).unwrap();
let (min, max) = seeking_iter.size_hint();
assert_eq!(min, 3); assert_eq!(max, Some(3));
}
#[test]
fn test_blocks_remaining() {
let (_, key_map) = prepare_blocks_map(5);
let (_, val_map) = prepare_blocks_map(5);
let key_index = Index::new(1, 1234);
let val_index = Index::new(1, 1234);
let mut reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let mut iter = reader.seeking_iter();
assert_eq!(iter.blocks_remaining(), 5);
iter.next();
assert_eq!(iter.blocks_remaining(), 4);
iter.seek(3).unwrap();
assert_eq!(iter.blocks_remaining(), 2);
}
#[test]
fn test_get_with_complete_key() {
let (dir, key_map, val_map, mut key_index, mut val_index) = prepare_test_segment_for_get();
let key = create_test_key(b"test_key");
let value = b"test_value";
let key_with_metadata = add_key_metadata(&key, 0, 0);
let mut key_block = Block::new();
key_block
.add_entry(&key_with_metadata, EntryFlag::Complete)
.unwrap();
let mut val_block = Block::new();
val_block.add_entry(value, EntryFlag::Complete).unwrap();
write_block_to_mapfor_get(&key_map, 0, &key_block);
write_block_to_mapfor_get(&val_map, 0, &val_block);
let key_without_ts = &key[..key.len() - 16];
key_index.insert_item(key_without_ts);
key_index.inc_block_count(1);
val_index.insert_item(key_without_ts);
val_index.inc_block_count(1);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let result = reader.get(&key).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().as_ref(), value);
}
#[test]
fn test_get_with_nonexistent_key() {
let (dir, key_map, val_map, key_index, val_index) = prepare_test_segment_for_get();
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let nonexistent_key = create_test_key(b"nonexistent_key");
let result = reader.get(&nonexistent_key).unwrap();
assert!(result.is_none());
}
#[test]
fn test_get_with_multiblock_key() {
let (dir, key_map, val_map, mut key_index, mut val_index) = prepare_test_segment_for_get();
let key = create_test_key(b"test_key");
let large_value = vec![b'v'; 1000];
let key_with_metadata = add_key_metadata(&key, 0, 0);
let mut key_block = Block::new();
key_block
.add_entry(&key_with_metadata, EntryFlag::Complete)
.unwrap();
let mut val_block1 = Block::new();
let val_part1 = &large_value[0..500];
val_block1.add_entry(val_part1, EntryFlag::Start).unwrap();
let mut val_block2 = Block::new();
let val_part2 = &large_value[500..];
val_block2.add_entry(val_part2, EntryFlag::End).unwrap();
write_block_to_mapfor_get(&key_map, 0, &key_block);
write_block_to_mapfor_get(&val_map, 0, &val_block1);
write_block_to_mapfor_get(&val_map, BLOCK_SIZE, &val_block2);
let key_without_ts = &key[..key.len() - 16];
key_index.insert_item(key_without_ts);
key_index.inc_block_count(1);
val_index.insert_item(key_without_ts);
val_index.inc_block_count(2);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let result = reader.get(&key).unwrap();
assert!(result.is_some());
let retrieved_value = result.unwrap();
assert_eq!(retrieved_value.len(), large_value.len());
assert_eq!(retrieved_value.as_ref(), large_value.as_slice());
}
#[test]
fn test_get_with_multiblock_key_and_value() {
let (dir, key_map, val_map, mut key_index, mut val_index) = prepare_test_segment_for_get();
let key = create_test_key(b"multi_key");
let value = vec![b'v'; 800];
let key_with_metadata = add_key_metadata(&key, 0, 0);
let mut key_block = Block::new();
key_block
.add_entry(&key_with_metadata, EntryFlag::Complete)
.unwrap();
let mut val_block1 = Block::new();
let val_part1 = &value[0..400];
val_block1.add_entry(val_part1, EntryFlag::Start).unwrap();
let mut val_block2 = Block::new();
let val_part2 = &value[400..];
val_block2.add_entry(val_part2, EntryFlag::End).unwrap();
write_block_to_mapfor_get(&key_map, 0, &key_block);
write_block_to_mapfor_get(&val_map, 0, &val_block1);
write_block_to_mapfor_get(&val_map, BLOCK_SIZE, &val_block2);
let key_without_ts = &key[..key.len() - 16];
key_index.insert_item(key_without_ts);
key_index.inc_block_count(1);
val_index.insert_item(key_without_ts);
val_index.inc_block_count(2);
assert!(
key_index.may_contain(key_without_ts),
"Key should be in bloom filter"
);
let block_offset = key_index.get_block(key_without_ts);
assert!(
block_offset.is_some(),
"Block for key should be found in index"
);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let result = reader.get(&key);
assert!(result.is_ok(), "Result should be Ok");
let result_value = result.unwrap();
assert!(result_value.is_some(), "Value should be found");
let retrieved_value = result_value.unwrap();
assert_eq!(retrieved_value.len(), value.len());
assert_eq!(retrieved_value.as_ref(), value.as_slice());
}
#[test]
fn test_get_with_bloom_filter_no_match() {
let (dir, key_map, val_map, mut key_index, val_index) = prepare_test_segment_for_get();
key_index.insert_item(b"other_key1");
key_index.insert_item(b"other_key2");
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let test_key = create_test_key(b"test_key");
let result = reader.get(&test_key).unwrap();
assert!(result.is_none());
}
#[test]
fn test_get_with_empty_blocks() {
let (dir, key_map, val_map, mut key_index, val_index) = prepare_test_segment_for_get();
let key = create_test_key(b"test_key");
let key_without_ts = &key[..key.len() - 16];
key_index.insert_item(key_without_ts);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let result = reader.get(&key).unwrap();
assert!(result.is_none());
}
#[test]
fn test_get_with_corrupted_blocks() {
let (dir, key_map, val_map, mut key_index, mut val_index) = prepare_test_segment_for_get();
let key = create_test_key(b"test_key");
let key_with_metadata = add_key_metadata(&key, 0, 0);
let mut key_block = Block::new();
key_block
.add_entry(&key_with_metadata, EntryFlag::Complete)
.unwrap();
let mut val_block = Block::new();
val_block.add_entry(b"part1", EntryFlag::Start).unwrap();
write_block_to_mapfor_get(&key_map, 0, &key_block);
write_block_to_mapfor_get(&val_map, 0, &val_block);
let key_without_ts = &key[..key.len() - 16];
key_index.insert_item(key_without_ts);
key_index.inc_block_count(1);
val_index.insert_item(key_without_ts);
val_index.inc_block_count(1);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let result = reader.get(&key);
assert!(result.is_err());
assert!(matches!(result.err().unwrap(), CorruptedBlock));
}
#[test]
fn test_get_with_multiple_keys_in_block() {
let (dir, key_map, val_map, mut key_index, mut val_index) = prepare_test_segment_for_get();
let key1 = create_test_key(b"test_key1");
let key2 = create_test_key(b"test_key2");
let key3 = create_test_key(b"test_key3");
let val1 = b"value1";
let val2 = b"value2";
let val3 = b"value3";
let key1_with_metadata = add_key_metadata(&key1, 0, 0);
let key2_with_metadata = add_key_metadata(&key2, 1, 0);
let key3_with_metadata = add_key_metadata(&key3, 2, 0);
let mut key_block1 = Block::new();
key_block1
.add_entry(&key1_with_metadata, EntryFlag::Complete)
.unwrap();
let mut key_block2 = Block::new();
key_block2
.add_entry(&key2_with_metadata, EntryFlag::Complete)
.unwrap();
let mut key_block3 = Block::new();
key_block3
.add_entry(&key3_with_metadata, EntryFlag::Complete)
.unwrap();
let mut val_block1 = Block::new();
val_block1.add_entry(val1, EntryFlag::Complete).unwrap();
let mut val_block2 = Block::new();
val_block2.add_entry(val2, EntryFlag::Complete).unwrap();
let mut val_block3 = Block::new();
val_block3.add_entry(val3, EntryFlag::Complete).unwrap();
write_block_to_mapfor_get(&key_map, 0, &key_block1); write_block_to_mapfor_get(&key_map, BLOCK_SIZE, &key_block2); write_block_to_mapfor_get(&key_map, BLOCK_SIZE * 2, &key_block3);
write_block_to_mapfor_get(&val_map, 0, &val_block1); write_block_to_mapfor_get(&val_map, BLOCK_SIZE, &val_block2); write_block_to_mapfor_get(&val_map, BLOCK_SIZE * 2, &val_block3);
let key1_without_ts = &key1[..key1.len() - 16];
let key2_without_ts = &key2[..key2.len() - 16];
let key3_without_ts = &key3[..key3.len() - 16];
key_index.insert_item(key1_without_ts); key_index.inc_block_count(1);
key_index.insert_item(key2_without_ts); key_index.inc_block_count(1);
key_index.insert_item(key3_without_ts); key_index.inc_block_count(1);
val_index.insert_item(key1_without_ts); val_index.inc_block_count(1);
val_index.insert_item(key2_without_ts); val_index.inc_block_count(1);
val_index.insert_item(key3_without_ts); val_index.inc_block_count(1);
let reader = SegmentReader::new(
key_map.clone(),
val_map.clone(),
Arc::new(parking_lot::RwLock::new(key_index)),
)
.unwrap();
let result1 = reader.get(&key1).unwrap();
assert!(result1.is_some());
assert_eq!(result1.unwrap().as_ref(), val1);
let result2 = reader.get(&key2).unwrap();
assert!(result2.is_some());
assert_eq!(result2.unwrap().as_ref(), val2);
let result3 = reader.get(&key3).unwrap();
assert!(result3.is_some());
assert_eq!(result3.unwrap().as_ref(), val3);
}
}