use std::collections::Bound;
use bytes::{
Bytes,
BytesMut,
};
use crate::{
block::{
Block,
EntryFlag,
},
errs::{
SegmentError,
SegmentError::ReadOutOfBounds,
},
keypair::{
KeyBytes,
ValueBytes,
},
segment_reader::SegmentReader,
utils::Deserializer,
};
pub(crate) fn convert_bound_to_bytes(bound: Bound<&[u8]>) -> Bound<Bytes> {
match bound {
| Bound::Included(data) => Bound::Included(Bytes::copy_from_slice(data)),
| Bound::Excluded(data) => Bound::Excluded(Bytes::copy_from_slice(data)),
| Bound::Unbounded => Bound::Unbounded,
}
}
pub(crate) struct SegmentBlockIterator<'a> {
reader: &'a mut SegmentReader,
current_block: usize,
}
impl SegmentBlockIterator<'_> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn new<'a>(reader: &'a mut SegmentReader) -> SegmentBlockIterator<'a> {
SegmentBlockIterator {
reader,
current_block: 0,
}
}
}
impl<'a> Iterator for SegmentBlockIterator<'a> {
type Item = Result<crate::block::ReadOnlyBlock, SegmentError>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_block >= self.reader.num_blocks {
return None;
}
let result = self.reader.read_key_block(self.current_block);
self.current_block += 1;
Some(result)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.reader.num_blocks - self.current_block;
(remaining, Some(remaining))
}
}
pub(crate) struct SeekingBlockIterator<'a> {
start: usize,
end: usize,
current: usize,
reader: &'a mut SegmentReader,
}
impl<'a> SeekingBlockIterator<'a> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn new<'b>(
reader: &'b mut SegmentReader,
start: usize,
end: usize,
) -> SeekingBlockIterator<'b> {
SeekingBlockIterator {
start,
end,
current: start,
reader,
}
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn seek(&mut self, block_index: usize) -> Result<(), SegmentError> {
if block_index >= self.end {
return Err(ReadOutOfBounds);
}
self.current = block_index;
Ok(())
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn current_position(&self) -> usize {
self.current
}
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub(crate) fn blocks_remaining(&self) -> usize {
self.end - self.current
}
}
impl<'a> Iterator for SeekingBlockIterator<'a> {
type Item = Result<crate::block::ReadOnlyBlock, SegmentError>;
fn next(&mut self) -> Option<Self::Item> {
if self.current >= self.end {
return None;
}
let result = self.reader.read_key_block(self.current);
self.current += 1;
Some(result)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.end - self.current;
(remaining, Some(remaining))
}
}
pub struct SegmentScanIterator {
reader: SegmentReader,
current_block_index: usize,
current_key_block: Option<crate::block::ReadOnlyBlock>,
current_key_index: usize,
lower_bound: Bound<Bytes>,
upper_bound: Bound<Bytes>,
is_upper_inclusive: bool,
is_lower_inclusive: bool,
}
impl Iterator for SegmentScanIterator {
type Item = Result<(KeyBytes, ValueBytes), SegmentError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.current_key_block.is_none() ||
self.current_key_index >=
self.current_key_block.as_ref().unwrap().num_entries() as usize
{
match self.load_next_block() {
| Ok(false) => return None, // No more blocks
| Ok(true) => {},
| Err(e) => return Some(Err(e)), }
}
let key_block = self.current_key_block.as_ref().unwrap();
match key_block.get(self.current_key_index) {
| Some((flag, data)) => {
self.current_key_index += 1;
let key_bytes = match flag {
| EntryFlag::Complete => Bytes::copy_from_slice(data),
| EntryFlag::Start => {
match self.read_full_key(flag, data) {
| Ok(bytes) => bytes,
| Err(e) => return Some(Err(e)),
}
},
| _ => continue, // Skip middle or end entries
};
use crate::segment::KEY_DATA_OFFSET;
let key_data = key_bytes.slice(KEY_DATA_OFFSET..);
if !self.is_in_range(&key_data) {
continue; }
let key = KeyBytes::deserialize(key_data);
let val_bytes = match self.read_value_for_key(&key_bytes) {
| Ok(Some(bytes)) => bytes,
| Ok(None) => continue, // No value found, skip this key
| Err(e) => return Some(Err(e)),
};
let value = ValueBytes::deserialize(val_bytes);
return Some(Ok((key, value)));
},
| None => {
self.current_key_block = None;
},
}
}
}
}
impl SegmentScanIterator {
pub fn new(
reader: SegmentReader,
range: (Bound<&[u8]>, Bound<&[u8]>),
start_block: usize,
) -> Self {
let lower_bound = convert_bound_to_bytes(range.0);
let upper_bound = convert_bound_to_bytes(range.1);
let is_lower_inclusive = matches!(lower_bound, Bound::Included(_));
let is_upper_inclusive = matches!(upper_bound, Bound::Included(_));
Self {
reader,
current_block_index: start_block,
current_key_block: None,
current_key_index: 0,
lower_bound,
upper_bound,
is_upper_inclusive,
is_lower_inclusive,
}
}
fn load_next_block(&mut self) -> Result<bool, SegmentError> {
if self.current_block_index >= self.reader.visible_key_blocks {
return Ok(false);
}
match self.reader.read_key_block(self.current_block_index) {
| Ok(block) => {
self.current_key_block = Some(block);
self.current_key_index = 0;
self.current_block_index += 1;
Ok(true)
},
| Err(e) => {
self.current_block_index += 1;
Err(e)
},
}
}
fn should_skip_current_block(&self) -> Option<bool> {
if matches!(self.lower_bound, Bound::Unbounded) {
return Some(false);
}
let key_block = match self.current_key_block.as_ref() {
| Some(b) => b,
| None => return None,
};
let num_entries = key_block.num_entries() as usize;
if num_entries == 0 {
return Some(true); }
let last_entry_idx = num_entries - 1;
for idx in (0..num_entries).rev() {
if let Some((flag, data)) = key_block.get(idx) {
match flag {
| EntryFlag::Complete => {
let key_bytes = Bytes::copy_from_slice(data);
use crate::segment::KEY_DATA_OFFSET;
if key_bytes.len() < KEY_DATA_OFFSET {
return None; }
let key_data = key_bytes.slice(KEY_DATA_OFFSET..);
let before_lower = match &self.lower_bound {
| Bound::Included(lower) => key_data.as_ref() < lower.as_ref(),
| Bound::Excluded(lower) => key_data.as_ref() <= lower.as_ref(),
| Bound::Unbounded => false,
};
return Some(before_lower);
},
| _ => continue, // Skip multi-block entries, check previous
}
}
}
None }
fn read_full_key(&self, flag: EntryFlag, initial_data: &[u8]) -> Result<Bytes, SegmentError> {
self.reader
.read_multiblock_entry(flag, initial_data, self.current_block_index - 1)
}
fn is_in_range(&self, key: &Bytes) -> bool {
let satisfies_lower = match &self.lower_bound {
| Bound::Included(lower) => key.as_ref() >= lower.as_ref(),
| Bound::Excluded(lower) => key.as_ref() > lower.as_ref(),
| Bound::Unbounded => true,
};
let satisfies_upper = match &self.upper_bound {
| Bound::Included(upper) => key.as_ref() <= upper.as_ref(),
| Bound::Excluded(upper) => key.as_ref() < upper.as_ref(),
| Bound::Unbounded => true,
};
satisfies_lower && satisfies_upper
}
fn is_past_upper_bound(&self, key: &Bytes) -> bool {
match &self.upper_bound {
| Bound::Included(upper) => key.as_ref() > upper.as_ref(),
| Bound::Excluded(upper) => key.as_ref() >= upper.as_ref(),
| Bound::Unbounded => false,
}
}
fn read_value_for_key(&self, key: &Bytes) -> Result<Option<Bytes>, SegmentError> {
if key.len() < 10 {
return Ok(None); }
let value_block_num = u64::from_le_bytes(key[0..8].try_into().unwrap());
let value_entry_index = u16::from_le_bytes(key[8..10].try_into().unwrap());
match self
.reader
.read_value(value_block_num as usize, value_entry_index as usize)
{
| Ok(value) => Ok(Some(value)),
| Err(e) => Err(e),
}
}
}
pub(crate) struct RawSegmentScanIterator {
reader: SegmentReader,
current_block_index: usize,
current_key_block: Option<crate::block::ReadOnlyBlock>,
current_key_index: usize,
lower_bound: Bound<Bytes>,
upper_bound: Bound<Bytes>,
is_upper_inclusive: bool,
is_lower_inclusive: bool,
}
impl Iterator for RawSegmentScanIterator {
type Item = Result<crate::raw_entry::RawEntry, SegmentError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.current_key_block.is_none() ||
self.current_key_index >=
self.current_key_block.as_ref().unwrap().num_entries() as usize
{
match self.load_next_block() {
| Ok(false) => return None,
| Ok(true) => {},
| Err(e) => return Some(Err(e)),
}
}
let key_block = self.current_key_block.as_ref().unwrap();
match key_block.get(self.current_key_index) {
| Some((flag, data)) => {
self.current_key_index += 1;
let key_bytes = match flag {
| EntryFlag::Complete => Bytes::copy_from_slice(data),
| EntryFlag::Start => match self.read_full_key(flag, data) {
| Ok(bytes) => bytes,
| Err(e) => return Some(Err(e)),
},
| _ => continue,
};
use crate::segment::KEY_DATA_OFFSET;
let key_data = key_bytes.slice(KEY_DATA_OFFSET..);
if !self.is_in_range(&key_data) {
continue;
}
let val_bytes = match self.read_value_for_key(&key_bytes) {
| Ok(Some(bytes)) => bytes,
| Ok(None) => continue,
| Err(e) => return Some(Err(e)),
};
return Some(Ok(crate::raw_entry::RawEntry::new(key_data, val_bytes)));
},
| None => {
self.current_key_block = None;
},
}
}
}
}
impl RawSegmentScanIterator {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn new(
reader: SegmentReader,
range: (Bound<&[u8]>, Bound<&[u8]>),
start_block: usize,
) -> Self {
let lower_bound = convert_bound_to_bytes(range.0);
let upper_bound = convert_bound_to_bytes(range.1);
let is_lower_inclusive = matches!(lower_bound, Bound::Included(_));
let is_upper_inclusive = matches!(upper_bound, Bound::Included(_));
Self {
reader,
current_block_index: start_block,
current_key_block: None,
current_key_index: 0,
lower_bound,
upper_bound,
is_upper_inclusive,
is_lower_inclusive,
}
}
fn load_next_block(&mut self) -> Result<bool, SegmentError> {
if self.current_block_index >= self.reader.visible_key_blocks {
return Ok(false);
}
match self.reader.read_key_block(self.current_block_index) {
| Ok(block) => {
self.current_key_block = Some(block);
self.current_key_index = 0;
self.current_block_index += 1;
Ok(true)
},
| Err(e) => {
self.current_block_index += 1;
Err(e)
},
}
}
fn should_skip_current_block(&self) -> Option<bool> {
if matches!(self.lower_bound, Bound::Unbounded) {
return Some(false);
}
let key_block = match self.current_key_block.as_ref() {
| Some(b) => b,
| None => return None,
};
let num_entries = key_block.num_entries() as usize;
if num_entries == 0 {
return Some(true);
}
for idx in (0..num_entries).rev() {
if let Some((flag, data)) = key_block.get(idx) {
match flag {
| EntryFlag::Complete => {
let key_bytes = Bytes::copy_from_slice(data);
use crate::segment::KEY_DATA_OFFSET;
if key_bytes.len() < KEY_DATA_OFFSET {
return None;
}
let key_data = key_bytes.slice(KEY_DATA_OFFSET..);
let before_lower = match &self.lower_bound {
| Bound::Included(lower) => key_data.as_ref() < lower.as_ref(),
| Bound::Excluded(lower) => key_data.as_ref() <= lower.as_ref(),
| Bound::Unbounded => false,
};
return Some(before_lower);
},
| _ => continue,
}
}
}
None
}
fn read_full_key(&self, flag: EntryFlag, initial_data: &[u8]) -> Result<Bytes, SegmentError> {
self.reader
.read_multiblock_entry(flag, initial_data, self.current_block_index - 1)
}
fn is_in_range(&self, key: &Bytes) -> bool {
let satisfies_lower = match &self.lower_bound {
| Bound::Included(lower) => key.as_ref() >= lower.as_ref(),
| Bound::Excluded(lower) => key.as_ref() > lower.as_ref(),
| Bound::Unbounded => true,
};
let satisfies_upper = match &self.upper_bound {
| Bound::Included(upper) => key.as_ref() <= upper.as_ref(),
| Bound::Excluded(upper) => key.as_ref() < upper.as_ref(),
| Bound::Unbounded => true,
};
satisfies_lower && satisfies_upper
}
fn is_past_upper_bound(&self, key: &Bytes) -> bool {
match &self.upper_bound {
| Bound::Included(upper) => key.as_ref() > upper.as_ref(),
| Bound::Excluded(upper) => key.as_ref() >= upper.as_ref(),
| Bound::Unbounded => false,
}
}
fn read_value_for_key(&self, key: &Bytes) -> Result<Option<Bytes>, SegmentError> {
if key.len() < 10 {
return Ok(None);
}
let value_block_num = u64::from_le_bytes(key[0..8].try_into().unwrap());
let value_entry_index = u16::from_le_bytes(key[8..10].try_into().unwrap());
match self
.reader
.read_value(value_block_num as usize, value_entry_index as usize)
{
| Ok(value) => Ok(Some(value)),
| Err(e) => Err(e),
}
}
}
#[cfg(test)]
#[allow(clippy::question_mark_used)]
#[allow(clippy::missing_safety_doc)]
#[allow(clippy::undocumented_unsafe_blocks)]
mod tests {
use std::{
collections::Bound,
ops::Range,
path::PathBuf,
sync::Arc,
};
use bytes::{
Bytes,
BytesMut,
};
use tempfile::tempdir;
use super::*;
use crate::{
block::{
BLOCK_SIZE,
Block,
EntryFlag,
},
errs::SegmentError,
index::Index,
keypair::{
DEFAULT_NS,
KeyBytes,
ValueBytes,
},
map::Map,
segment_reader::{
ReadConfig,
SegmentReader,
},
};
fn create_test_segment_reader(
num_key_blocks: usize,
num_val_blocks: usize,
key_index: Index,
val_index: Index,
) -> (SegmentReader, tempfile::TempDir) {
let dir = tempdir().expect("failed to create temp dir");
let key_path = dir.path().join("test-key-segment");
let key_map = Arc::new(
Map::new(key_path, (num_key_blocks * BLOCK_SIZE) as u64)
.expect("failed to create key map"),
);
let val_path = dir.path().join("test-val-segment");
let val_map = Arc::new(
Map::new(val_path, (num_val_blocks * BLOCK_SIZE) as u64)
.expect("failed to create val map"),
);
for i in 0..num_key_blocks {
let mut block = Block::new();
let key_data = format!("key_{}", i).into_bytes();
block
.add_entry(&key_data, EntryFlag::Complete)
.expect("Failed to add entry to block");
let offset = i * BLOCK_SIZE;
let block_range = offset..(offset + BLOCK_SIZE);
key_map
.write_to_range(block_range, |slice| unsafe {
block.finalize(slice.as_mut_ptr());
})
.expect("Failed to write key block");
}
for i in 0..num_val_blocks {
let mut block = Block::new();
let val_data = format!("value_{}", i).into_bytes();
block
.add_entry(&val_data, EntryFlag::Complete)
.expect("Failed to add entry to block");
let offset = i * BLOCK_SIZE;
let block_range = offset..(offset + BLOCK_SIZE);
val_map
.write_to_range(block_range, |slice| unsafe {
block.finalize(slice.as_mut_ptr());
})
.expect("Failed to write value block");
}
let reader = SegmentReader::new(
key_map,
val_map,
Arc::new(parking_lot::RwLock::new(key_index)),
)
.expect("Failed to create segment reader");
(reader, dir)
}
fn create_multi_block_segment(
mut key_index: Index,
mut val_index: Index,
) -> (SegmentReader, tempfile::TempDir) {
let dir = tempdir().expect("failed to create temp dir");
let key_path = dir.path().join("multi-key-segment");
let key_map = Arc::new(
Map::new(key_path, (5 * BLOCK_SIZE) as u64).expect("failed to create key map"),
);
let val_path = dir.path().join("multi-val-segment");
let val_map = Arc::new(
Map::new(val_path, (5 * BLOCK_SIZE) as u64).expect("failed to create val map"),
);
let mut block0 = Block::new();
let mut key0 = Vec::new();
key0.extend_from_slice(&[0u8; 8]); key0.extend_from_slice(b"regular_key");
key0.extend_from_slice(&[0u8; 16]);
block0
.add_entry(&key0, EntryFlag::Complete)
.expect("Failed to add entry");
key_map
.write_to_range(0..BLOCK_SIZE, |slice| unsafe {
block0.finalize(slice.as_mut_ptr());
})
.expect("Failed to write block0");
let mut block1 = Block::new();
let mut key_part1 = Vec::new();
key_part1.extend_from_slice(&[0u8; 8]); key_part1.extend_from_slice(b"multi_key_part1");
block1
.add_entry(&key_part1, EntryFlag::Start)
.expect("Failed to add entry");
key_map
.write_to_range(BLOCK_SIZE..(2 * BLOCK_SIZE), |slice| unsafe {
block1.finalize(slice.as_mut_ptr());
})
.expect("Failed to write block1");
let mut block2 = Block::new();
let key_part2 = b"multi_key_part2";
block2
.add_entry(key_part2, EntryFlag::End)
.expect("Failed to add entry");
key_map
.write_to_range((2 * BLOCK_SIZE)..(3 * BLOCK_SIZE), |slice| unsafe {
block2.finalize(slice.as_mut_ptr());
})
.expect("Failed to write block2");
let mut val_block0 = Block::new();
let mut val0 = Vec::new();
val0.extend_from_slice(&[0u8; 8]); val0.extend_from_slice(b"regular_value");
val_block0
.add_entry(&val0, EntryFlag::Complete)
.expect("Failed to add entry");
val_map
.write_to_range(0..BLOCK_SIZE, |slice| unsafe {
val_block0.finalize(slice.as_mut_ptr());
})
.expect("Failed to write val_block0");
let mut val_block1 = Block::new();
let mut val_part1 = Vec::new();
val_part1.extend_from_slice(&[0u8; 8]); val_part1.extend_from_slice(b"multi_value_part1");
val_block1
.add_entry(&val_part1, EntryFlag::Start)
.expect("Failed to add entry");
val_map
.write_to_range(BLOCK_SIZE..(2 * BLOCK_SIZE), |slice| unsafe {
val_block1.finalize(slice.as_mut_ptr());
})
.expect("Failed to write val_block1");
let mut val_block2 = Block::new();
let val_part2 = b"multi_value_part2";
val_block2
.add_entry(val_part2, EntryFlag::End)
.expect("Failed to add entry");
val_map
.write_to_range((2 * BLOCK_SIZE)..(3 * BLOCK_SIZE), |slice| unsafe {
val_block2.finalize(slice.as_mut_ptr());
})
.expect("Failed to write val_block2");
key_index.inc_block_count(1);
key_index.insert_item(&key0);
let mut multi_key = Vec::new();
multi_key.extend_from_slice(&[0u8; 8]); multi_key.extend_from_slice(b"multi_key_part1");
multi_key.extend_from_slice(b"multi_key_part2");
multi_key.extend_from_slice(&[0u8; 16]);
key_index.inc_block_count(1);
key_index.insert_item(&multi_key);
val_index.inc_block_count(1);
val_index.insert_item(&key0);
val_index.inc_block_count(1);
val_index.insert_item(&multi_key);
let reader = SegmentReader::new(
key_map,
val_map,
Arc::new(parking_lot::RwLock::new(key_index)),
)
.expect("Failed to create segment reader");
(reader, dir)
}
fn create_scan_test_segment(
mut key_index: Index,
mut val_index: Index,
) -> (SegmentReader, tempfile::TempDir) {
let dir = tempdir().expect("failed to create temp dir");
let key_path = dir.path().join("scan-key-segment");
let key_map = Arc::new(
Map::new(key_path, (10 * BLOCK_SIZE) as u64).expect("failed to create key map"),
);
let val_path = dir.path().join("scan-val-segment");
let val_map = Arc::new(
Map::new(val_path, (10 * BLOCK_SIZE) as u64).expect("failed to create val map"),
);
let test_keys = [
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_a", b"value_a"),
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_b", b"value_b"),
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_c", b"value_c"),
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_d", b"value_d"),
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_e", b"value_e"),
([1u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_a", b"ns1_val"),
([1u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_z", b"ns1_val"),
];
for (i, (ns, key, value)) in test_keys.iter().enumerate() {
let mut full_key = Vec::with_capacity(ns.len() + key.len() + 16);
full_key.extend_from_slice(ns); full_key.extend_from_slice(key.as_ref()); full_key.extend_from_slice(&[0u8; 16]);
let mut full_value = Vec::with_capacity(ns.len() + value.len());
full_value.extend_from_slice(ns);
full_value.extend_from_slice(value.as_ref());
let mut key_block = Block::new();
key_block
.add_entry(&full_key, EntryFlag::Complete)
.expect("Failed to add key entry");
let mut val_block = Block::new();
val_block
.add_entry(&full_value, EntryFlag::Complete)
.expect("Failed to add value entry");
let offset = i * BLOCK_SIZE;
key_map
.write_to_range(offset..(offset + BLOCK_SIZE), |slice| unsafe {
key_block.finalize(slice.as_mut_ptr());
})
.expect("Failed to write key block");
val_map
.write_to_range(offset..(offset + BLOCK_SIZE), |slice| unsafe {
val_block.finalize(slice.as_mut_ptr());
})
.expect("Failed to write value block");
let key_without_ts = &full_key[..full_key.len() - 16];
key_index.inc_block_count(1);
key_index.insert_item(key_without_ts);
val_index.inc_block_count(1);
val_index.insert_item(key_without_ts);
}
let reader = SegmentReader::new(
key_map,
val_map,
Arc::new(parking_lot::RwLock::new(key_index)),
)
.expect("Failed to create segment reader");
(reader, dir)
}
#[test]
fn test_convert_bound_to_bytes() {
let data = b"test_data";
let included = Bound::Included(data as &[u8]);
match convert_bound_to_bytes(included) {
| Bound::Included(bytes) => {
assert_eq!(bytes.as_ref(), data);
},
| _ => panic!("Expected Included bound"),
}
let excluded = Bound::Excluded(data as &[u8]);
match convert_bound_to_bytes(excluded) {
| Bound::Excluded(bytes) => {
assert_eq!(bytes.as_ref(), data);
},
| _ => panic!("Expected Excluded bound"),
}
let unbounded = Bound::Unbounded;
match convert_bound_to_bytes(unbounded) {
| Bound::Unbounded => {
},
| _ => panic!("Expected Unbounded bound"),
}
}
#[test]
fn test_segment_block_iterator_empty() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_test_segment_reader(0, 0, key_index, val_index);
let iter = SegmentBlockIterator::new(&mut reader);
let blocks: Vec<_> = iter.collect();
assert!(blocks.is_empty(), "Expected no blocks from empty segment");
}
#[test]
fn test_segment_block_iterator_single_block() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_test_segment_reader(1, 1, key_index, val_index);
let iter = SegmentBlockIterator::new(&mut reader);
let blocks: Vec<_> = iter.collect();
assert_eq!(blocks.len(), 1, "Expected 1 block");
let block = blocks[0].as_ref().expect("Expected successful block read");
assert_eq!(block.num_entries(), 1, "Expected 1 entry in the block");
if let Some((flag, data)) = block.get(0) {
assert_eq!(flag, EntryFlag::Complete, "Expected Complete flag");
assert_eq!(data, b"key_0", "Expected key_0 as data");
} else {
panic!("Expected entry in block");
}
}
#[test]
fn test_segment_block_iterator_multiple_blocks() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let num_blocks = 5;
let (mut reader, _dir) =
create_test_segment_reader(num_blocks, num_blocks, key_index, val_index);
let iter = SegmentBlockIterator::new(&mut reader);
let blocks: Vec<_> = iter.collect();
assert_eq!(blocks.len(), num_blocks, "Expected 5 blocks");
for (i, block_result) in blocks.iter().enumerate() {
match block_result {
| Ok(block) => {
assert_eq!(block.num_entries(), 1, "Expected 1 entry in block {}", i);
if let Some((flag, data)) = block.get(0) {
assert_eq!(
flag,
EntryFlag::Complete,
"Expected Complete flag for block {}",
i
);
let expected_data = format!("key_{}", i).into_bytes();
assert_eq!(data, expected_data.as_slice(), "Expected key_{} as data", i);
} else {
panic!("Expected entry in block {}", i);
}
},
| Err(e) => {
panic!("Failed to read block {}: {:?}", i, e);
},
}
}
}
#[test]
fn test_segment_block_iterator_size_hint() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let num_blocks = 3;
let (mut reader, _dir) =
create_test_segment_reader(num_blocks, num_blocks, key_index, val_index);
let mut iter = SegmentBlockIterator::new(&mut reader);
let (min, max) = iter.size_hint();
assert_eq!(
min, num_blocks,
"Initial min size hint should match num_blocks"
);
assert_eq!(
max,
Some(num_blocks),
"Initial max size hint should match num_blocks"
);
let _ = iter.next();
let (min, max) = iter.size_hint();
assert_eq!(min, num_blocks - 1, "Min size hint should decrease");
assert_eq!(max, Some(num_blocks - 1), "Max size hint should decrease");
let _ = iter.next();
let _ = iter.next();
let (min, max) = iter.size_hint();
assert_eq!(min, 0, "Min size hint should be 0 after all items consumed");
assert_eq!(
max,
Some(0),
"Max size hint should be 0 after all items consumed"
);
}
#[test]
fn test_segment_block_iterator_error_handling() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_test_segment_reader(5, 5, key_index, val_index);
reader.visible_key_blocks = 0;
reader.num_blocks = 0;
let iter = SegmentBlockIterator::new(&mut reader);
let results: Vec<_> = iter.collect();
assert_eq!(
results.len(),
0,
"Expected no blocks due to visibility settings"
);
}
#[test]
fn test_seeking_block_iterator_empty_range() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_test_segment_reader(5, 5, key_index, val_index);
let iter = SeekingBlockIterator::new(&mut reader, 2, 2);
let blocks: Vec<_> = iter.collect();
assert!(blocks.is_empty(), "Expected no blocks from empty range");
}
#[test]
fn test_seeking_block_iterator_full_range() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let num_blocks = 5;
let (mut reader, _dir) =
create_test_segment_reader(num_blocks, num_blocks, key_index, val_index);
let iter = SeekingBlockIterator::new(&mut reader, 0, num_blocks);
let blocks: Vec<_> = iter.collect();
assert_eq!(blocks.len(), num_blocks, "Expected all blocks");
for (i, block_result) in blocks.iter().enumerate() {
match block_result {
| Ok(block) => {
if let Some((flag, data)) = block.get(0) {
let expected_data = format!("key_{}", i).into_bytes();
assert_eq!(data, expected_data.as_slice(), "Expected key_{} as data", i);
}
},
| Err(e) => {
panic!("Failed to read block {}: {:?}", i, e);
},
}
}
}
#[test]
fn test_seeking_block_iterator_partial_range() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let num_blocks = 5;
let (mut reader, _dir) =
create_test_segment_reader(num_blocks, num_blocks, key_index, val_index);
let iter = SeekingBlockIterator::new(&mut reader, 1, 4);
let blocks: Vec<_> = iter.collect();
assert_eq!(blocks.len(), 3, "Expected 3 blocks in range [1, 4)");
for (i, block_result) in blocks.iter().enumerate() {
match block_result {
| Ok(block) => {
if let Some((flag, data)) = block.get(0) {
let expected_data = format!("key_{}", i + 1).into_bytes();
assert_eq!(
data,
expected_data.as_slice(),
"Expected key_{} as data",
i + 1
);
}
},
| Err(e) => {
panic!("Failed to read block {}: {:?}", i + 1, e);
},
}
}
}
#[test]
fn test_seeking_block_iterator_seek() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let num_blocks = 5;
let (mut reader, _dir) =
create_test_segment_reader(num_blocks, num_blocks, key_index, val_index);
let mut iter = SeekingBlockIterator::new(&mut reader, 0, num_blocks);
assert_eq!(iter.current_position(), 0, "Initial position should be 0");
match iter.seek(2) {
| Ok(()) => {},
| Err(e) => panic!("Seek failed: {:?}", e),
}
assert_eq!(
iter.current_position(),
2,
"Position after seek should be 2"
);
if let Some(Ok(block)) = iter.next() {
if let Some((_, data)) = block.get(0) {
assert_eq!(data, b"key_2", "Expected key_2 after seek");
} else {
panic!("Expected entry in block after seek");
}
} else {
panic!("Expected valid block after seek");
}
match iter.seek(num_blocks - 1) {
| Ok(()) => {},
| Err(e) => panic!("Seek to end failed: {:?}", e),
}
if let Some(Ok(block)) = iter.next() {
if let Some((_, data)) = block.get(0) {
assert_eq!(
data,
format!("key_{}", num_blocks - 1).as_bytes(),
"Expected key_{} after seek to end",
num_blocks - 1
);
}
} else {
panic!("Expected valid block after seek to end");
}
assert!(
iter.next().is_none(),
"Expected no more blocks after reading the last one"
);
}
#[test]
fn test_seeking_block_iterator_seek_out_of_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let num_blocks = 5;
let (mut reader, _dir) =
create_test_segment_reader(num_blocks, num_blocks, key_index, val_index);
let mut iter = SeekingBlockIterator::new(&mut reader, 0, num_blocks);
let result = iter.seek(num_blocks);
assert!(result.is_err(), "Expected error when seeking out of bounds");
match result {
| Err(SegmentError::ReadOutOfBounds) => {
},
| _ => panic!("Expected ReadOutOfBounds error"),
}
assert_eq!(
iter.current_position(),
0,
"Position should not change after failed seek"
);
}
#[test]
fn test_seeking_block_iterator_blocks_remaining() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let num_blocks = 5;
let (mut reader, _dir) =
create_test_segment_reader(num_blocks, num_blocks, key_index, val_index);
let mut iter = SeekingBlockIterator::new(&mut reader, 0, num_blocks);
assert_eq!(
iter.blocks_remaining(),
num_blocks,
"Initial blocks_remaining should be num_blocks"
);
iter.next();
assert_eq!(
iter.blocks_remaining(),
num_blocks - 1,
"blocks_remaining should decrease after reading"
);
iter.seek(3).expect("Seek should succeed");
assert_eq!(
iter.blocks_remaining(),
num_blocks - 3,
"blocks_remaining should reflect position after seek"
);
}
#[test]
fn test_seeking_block_iterator_size_hint() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let num_blocks = 5;
let (mut reader, _dir) =
create_test_segment_reader(num_blocks, num_blocks, key_index, val_index);
let mut iter = SeekingBlockIterator::new(&mut reader, 0, num_blocks);
let (min, max) = iter.size_hint();
assert_eq!(
min, num_blocks,
"Initial min size hint should match num_blocks"
);
assert_eq!(
max,
Some(num_blocks),
"Initial max size hint should match num_blocks"
);
let _ = iter.next();
let (min, max) = iter.size_hint();
assert_eq!(min, num_blocks - 1, "Min size hint should decrease");
assert_eq!(max, Some(num_blocks - 1), "Max size hint should decrease");
iter.seek(3).expect("Seek should succeed");
let (min, max) = iter.size_hint();
assert_eq!(
min,
num_blocks - 3,
"Min size hint should reflect position after seek"
);
assert_eq!(
max,
Some(num_blocks - 3),
"Max size hint should reflect position after seek"
);
}
#[test]
fn test_segment_scan_iterator_empty_range() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
let mut lower = vec![0u8; 8]; lower.extend_from_slice(b"key_z"); lower.extend_from_slice(&[0u8; 16]);
let mut upper = vec![0u8; 8]; upper.extend_from_slice(b"key_a"); upper.extend_from_slice(&[0u8; 16]);
let iter = reader.scan(Bound::Included(&lower), Bound::Included(&upper));
let results: Vec<_> = iter.collect();
assert!(results.is_empty(), "Expected no results for empty range");
}
#[test]
fn test_segment_scan_iterator_inclusive_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
assert!(
reader.read_key_block(0).is_ok(),
"Should be able to read block 0"
);
assert!(
reader.read_key_block(1).is_ok(),
"Should be able to read block 1"
);
assert!(
reader.read_key_block(2).is_ok(),
"Should be able to read block 2"
);
}
#[test]
fn test_segment_scan_iterator_exclusive_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
assert!(reader.num_blocks() > 0, "Should have blocks available");
let (key_blocks, val_blocks) = reader.visible_blocks();
assert!(key_blocks > 0, "Should have visible key blocks");
assert!(val_blocks > 0, "Should have visible value blocks");
}
#[test]
fn test_segment_scan_iterator_mixed_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_scan_test_segment(key_index, val_index);
let blocks: Vec<_> = reader.iter().collect();
assert!(
!blocks.is_empty(),
"Should get blocks using SegmentBlockIterator"
);
}
#[test]
fn test_segment_scan_seeking_iterator_mixed_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_scan_test_segment(key_index, val_index);
let mut seeker = reader.seeking_iter();
assert!(
seeker.next().is_some(),
"Should get at least one block with SeekingBlockIterator"
);
}
#[test]
fn test_segment_scan_iterator_unbounded() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
let block0 = reader.read_key_block(0);
assert!(block0.is_ok(), "Should be able to read block 0");
let block = block0.unwrap();
assert!(block.num_entries() > 0, "Block should have entries");
}
#[test]
fn test_segment_scan_iterator_namespace_filtering() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
assert!(reader.num_blocks() > 0, "Expected blocks in segment");
}
#[test]
fn test_segment_scan_iterator_non_existent_keys() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
let non_existent_key = &[
0u8, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b't', b'_', b'f', b'o', b'u', b'n', b'd',
][..];
assert!(
!reader.key_index.write().may_contain(non_existent_key),
"Bloom filter should not contain non-existent key"
);
assert!(
reader
.key_index
.write()
.get_block(non_existent_key)
.is_none(),
"Should not find block for non-existent key"
);
}
#[test]
fn test_is_in_range() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
assert!(
reader.key_index.write().block_count() > 0,
"Expected blocks in key index"
);
}
#[test]
fn test_is_past_upper_bound() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_scan_test_segment(key_index, val_index);
reader.visible_key_blocks = 2;
let block0 = reader.read_key_block(0);
assert!(block0.is_ok(), "Should be able to read first block");
let block1 = reader.read_key_block(1);
assert!(block1.is_ok(), "Should be able to read second block");
let block2 = reader.read_key_block(2);
assert!(
block2.is_err(),
"Should get error when reading beyond visibility limit"
);
}
#[test]
fn test_segment_scan_iterator_errors() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_scan_test_segment(key_index, val_index);
reader.visible_key_blocks = 0;
let iter = reader.scan(Bound::Unbounded, Bound::Unbounded);
let results: Vec<_> = iter.collect();
assert_eq!(
results.len(),
0,
"Expected no results when no blocks are visible"
);
}
}