use std::collections::Bound;
use bytes::Bytes;
use crate::{
block::EntryFlag,
errs::SegmentError,
keypair::{
KeyBytes,
ValueBytes,
},
segment_reader::SegmentReader,
utils::Deserializer,
};
pub(crate) fn convert_bound_to_bytes(bound: Bound<&[u8]>) -> Bound<Bytes> {
match bound {
| Bound::Included(data) => Bound::Included(Bytes::copy_from_slice(data)),
| Bound::Excluded(data) => Bound::Excluded(Bytes::copy_from_slice(data)),
| Bound::Unbounded => Bound::Unbounded,
}
}
pub struct SegmentScanIterator {
reader: SegmentReader,
current_block_index: usize,
current_key_block: Option<crate::block::ReadOnlyBlock>,
current_key_index: usize,
lower_bound: Bound<Bytes>,
upper_bound: Bound<Bytes>,
}
impl Iterator for SegmentScanIterator {
type Item = Result<(KeyBytes, ValueBytes), SegmentError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.current_key_block.is_none() ||
self.current_key_index >=
self.current_key_block.as_ref().unwrap().num_entries() as usize
{
match self.load_next_block() {
| Ok(false) => return None, // No more blocks
| Ok(true) => {},
| Err(e) => return Some(Err(e)), }
}
let key_block = self.current_key_block.as_ref().unwrap();
match key_block.get(self.current_key_index) {
| Some((flag, data)) => {
self.current_key_index += 1;
let key_bytes = match flag {
| EntryFlag::Complete => Bytes::copy_from_slice(data),
| EntryFlag::Start => {
match self.read_full_key(flag, data) {
| Ok(bytes) => bytes,
| Err(e) => return Some(Err(e)),
}
},
| _ => continue, // Skip middle or end entries
};
use crate::segment::KEY_DATA_OFFSET;
let key_data = key_bytes.slice(KEY_DATA_OFFSET..);
if !self.is_in_range(&key_data) {
continue; }
let key = KeyBytes::deserialize(key_data);
let val_bytes = match self.read_value_for_key(&key_bytes) {
| Ok(Some(bytes)) => bytes,
| Ok(None) => continue, // No value found, skip this key
| Err(e) => return Some(Err(e)),
};
let value = ValueBytes::deserialize(val_bytes);
return Some(Ok((key, value)));
},
| None => {
self.current_key_block = None;
},
}
}
}
}
impl SegmentScanIterator {
pub fn new(
reader: SegmentReader,
range: (Bound<&[u8]>, Bound<&[u8]>),
start_block: usize,
) -> Self {
let lower_bound = convert_bound_to_bytes(range.0);
let upper_bound = convert_bound_to_bytes(range.1);
Self {
reader,
current_block_index: start_block,
current_key_block: None,
current_key_index: 0,
lower_bound,
upper_bound,
}
}
fn load_next_block(&mut self) -> Result<bool, SegmentError> {
if self.current_block_index >= self.reader.visible_key_blocks {
return Ok(false);
}
match self.reader.read_key_block(self.current_block_index) {
| Ok(block) => {
self.current_key_block = Some(block);
self.current_key_index = 0;
self.current_block_index += 1;
Ok(true)
},
| Err(e) => {
self.current_block_index += 1;
Err(e)
},
}
}
fn read_full_key(&self, flag: EntryFlag, initial_data: &[u8]) -> Result<Bytes, SegmentError> {
self.reader
.read_multiblock_entry(flag, initial_data, self.current_block_index - 1)
}
fn is_in_range(&self, key: &Bytes) -> bool {
let satisfies_lower = match &self.lower_bound {
| Bound::Included(lower) => key.as_ref() >= lower.as_ref(),
| Bound::Excluded(lower) => key.as_ref() > lower.as_ref(),
| Bound::Unbounded => true,
};
let satisfies_upper = match &self.upper_bound {
| Bound::Included(upper) => key.as_ref() <= upper.as_ref(),
| Bound::Excluded(upper) => key.as_ref() < upper.as_ref(),
| Bound::Unbounded => true,
};
satisfies_lower && satisfies_upper
}
fn read_value_for_key(&self, key: &Bytes) -> Result<Option<Bytes>, SegmentError> {
if key.len() < 10 {
return Ok(None); }
let value_block_num = u64::from_le_bytes(key[0..8].try_into().unwrap());
let value_entry_index = u16::from_le_bytes(key[8..10].try_into().unwrap());
match self
.reader
.read_value(value_block_num as usize, value_entry_index as usize)
{
| Ok(value) => Ok(Some(value)),
| Err(e) => Err(e),
}
}
}
pub(crate) struct RawSegmentScanIterator {
reader: SegmentReader,
current_block_index: usize,
current_key_block: Option<crate::block::ReadOnlyBlock>,
current_key_index: usize,
lower_bound: Bound<Bytes>,
upper_bound: Bound<Bytes>,
}
impl Iterator for RawSegmentScanIterator {
type Item = Result<crate::raw_entry::RawEntry, SegmentError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.current_key_block.is_none() ||
self.current_key_index >=
self.current_key_block.as_ref().unwrap().num_entries() as usize
{
match self.load_next_block() {
| Ok(false) => return None,
| Ok(true) => {},
| Err(e) => return Some(Err(e)),
}
}
let key_block = self.current_key_block.as_ref().unwrap();
match key_block.get(self.current_key_index) {
| Some((flag, data)) => {
self.current_key_index += 1;
let key_bytes = match flag {
| EntryFlag::Complete => Bytes::copy_from_slice(data),
| EntryFlag::Start => match self.read_full_key(flag, data) {
| Ok(bytes) => bytes,
| Err(e) => return Some(Err(e)),
},
| _ => continue,
};
use crate::segment::KEY_DATA_OFFSET;
let key_data = key_bytes.slice(KEY_DATA_OFFSET..);
if !self.is_in_range(&key_data) {
continue;
}
let val_bytes = match self.read_value_for_key(&key_bytes) {
| Ok(Some(bytes)) => bytes,
| Ok(None) => continue,
| Err(e) => return Some(Err(e)),
};
return Some(Ok(crate::raw_entry::RawEntry::new(key_data, val_bytes)));
},
| None => {
self.current_key_block = None;
},
}
}
}
}
impl RawSegmentScanIterator {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all, level = "debug"))]
pub fn new(
reader: SegmentReader,
range: (Bound<&[u8]>, Bound<&[u8]>),
start_block: usize,
) -> Self {
let lower_bound = convert_bound_to_bytes(range.0);
let upper_bound = convert_bound_to_bytes(range.1);
Self {
reader,
current_block_index: start_block,
current_key_block: None,
current_key_index: 0,
lower_bound,
upper_bound,
}
}
fn load_next_block(&mut self) -> Result<bool, SegmentError> {
if self.current_block_index >= self.reader.visible_key_blocks {
return Ok(false);
}
match self.reader.read_key_block(self.current_block_index) {
| Ok(block) => {
self.current_key_block = Some(block);
self.current_key_index = 0;
self.current_block_index += 1;
Ok(true)
},
| Err(e) => {
self.current_block_index += 1;
Err(e)
},
}
}
fn read_full_key(&self, flag: EntryFlag, initial_data: &[u8]) -> Result<Bytes, SegmentError> {
self.reader
.read_multiblock_entry(flag, initial_data, self.current_block_index - 1)
}
fn is_in_range(&self, key: &Bytes) -> bool {
let satisfies_lower = match &self.lower_bound {
| Bound::Included(lower) => key.as_ref() >= lower.as_ref(),
| Bound::Excluded(lower) => key.as_ref() > lower.as_ref(),
| Bound::Unbounded => true,
};
let satisfies_upper = match &self.upper_bound {
| Bound::Included(upper) => key.as_ref() <= upper.as_ref(),
| Bound::Excluded(upper) => key.as_ref() < upper.as_ref(),
| Bound::Unbounded => true,
};
satisfies_lower && satisfies_upper
}
fn read_value_for_key(&self, key: &Bytes) -> Result<Option<Bytes>, SegmentError> {
if key.len() < 10 {
return Ok(None);
}
let value_block_num = u64::from_le_bytes(key[0..8].try_into().unwrap());
let value_entry_index = u16::from_le_bytes(key[8..10].try_into().unwrap());
match self
.reader
.read_value(value_block_num as usize, value_entry_index as usize)
{
| Ok(value) => Ok(Some(value)),
| Err(e) => Err(e),
}
}
}
#[cfg(test)]
#[allow(clippy::question_mark_used)]
#[allow(clippy::missing_safety_doc)]
#[allow(clippy::undocumented_unsafe_blocks)]
mod tests {
use std::{
collections::Bound,
sync::Arc,
};
use tempfile::tempdir;
use super::*;
use crate::{
block::{
BLOCK_SIZE,
Block,
EntryFlag,
},
index::Index,
map::Map,
segment_reader::SegmentReader,
};
fn create_scan_test_segment(
mut key_index: Index,
mut val_index: Index,
) -> (SegmentReader, tempfile::TempDir) {
let dir = tempdir().expect("failed to create temp dir");
let key_path = dir.path().join("scan-key-segment");
let key_map = Arc::new(
Map::new(key_path, (10 * BLOCK_SIZE) as u64).expect("failed to create key map"),
);
let val_path = dir.path().join("scan-val-segment");
let val_map = Arc::new(
Map::new(val_path, (10 * BLOCK_SIZE) as u64).expect("failed to create val map"),
);
let test_keys = [
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_a", b"value_a"),
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_b", b"value_b"),
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_c", b"value_c"),
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_d", b"value_d"),
([0u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_e", b"value_e"),
([1u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_a", b"ns1_val"),
([1u8, 0, 0, 0, 0, 0, 0, 0] as [u8; 8], b"key_z", b"ns1_val"),
];
for (i, (ns, key, value)) in test_keys.iter().enumerate() {
let mut full_key = Vec::with_capacity(ns.len() + key.len() + 16);
full_key.extend_from_slice(ns); full_key.extend_from_slice(key.as_ref()); full_key.extend_from_slice(&[0u8; 16]); let mut full_value = Vec::with_capacity(ns.len() + value.len());
full_value.extend_from_slice(ns);
full_value.extend_from_slice(value.as_ref());
let mut key_block = Block::new();
key_block
.add_entry(&full_key, EntryFlag::Complete)
.expect("Failed to add key entry");
let mut val_block = Block::new();
val_block
.add_entry(&full_value, EntryFlag::Complete)
.expect("Failed to add value entry");
let offset = i * BLOCK_SIZE;
key_map
.write_to_range(offset..(offset + BLOCK_SIZE), |slice| unsafe {
key_block.finalize(slice.as_mut_ptr());
})
.expect("Failed to write key block");
val_map
.write_to_range(offset..(offset + BLOCK_SIZE), |slice| unsafe {
val_block.finalize(slice.as_mut_ptr());
})
.expect("Failed to write value block");
let key_without_ts = &full_key[..full_key.len() - 16];
key_index.inc_block_count(1);
key_index.insert_item(key_without_ts);
val_index.inc_block_count(1);
val_index.insert_item(key_without_ts);
}
let reader = SegmentReader::new(
key_map,
val_map,
Arc::new(parking_lot::RwLock::new(key_index)),
)
.expect("Failed to create segment reader");
(reader, dir)
}
#[test]
fn test_convert_bound_to_bytes() {
let data = b"test_data";
let included = Bound::Included(data as &[u8]);
match convert_bound_to_bytes(included) {
| Bound::Included(bytes) => {
assert_eq!(bytes.as_ref(), data);
},
| _ => panic!("Expected Included bound"),
}
let excluded = Bound::Excluded(data as &[u8]);
match convert_bound_to_bytes(excluded) {
| Bound::Excluded(bytes) => {
assert_eq!(bytes.as_ref(), data);
},
| _ => panic!("Expected Excluded bound"),
}
let unbounded = Bound::Unbounded;
match convert_bound_to_bytes(unbounded) {
| Bound::Unbounded => {
},
| _ => panic!("Expected Unbounded bound"),
}
}
#[test]
fn test_segment_scan_iterator_empty_range() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
let mut lower = vec![0u8; 8]; lower.extend_from_slice(b"key_z"); lower.extend_from_slice(&[0u8; 16]); let mut upper = vec![0u8; 8]; upper.extend_from_slice(b"key_a"); upper.extend_from_slice(&[0u8; 16]); let iter = reader.scan(Bound::Included(&lower), Bound::Included(&upper));
let results: Vec<_> = iter.collect();
assert!(results.is_empty(), "Expected no results for empty range");
}
#[test]
fn test_segment_scan_iterator_inclusive_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
assert!(
reader.read_key_block(0).is_ok(),
"Should be able to read block 0"
);
assert!(
reader.read_key_block(1).is_ok(),
"Should be able to read block 1"
);
assert!(
reader.read_key_block(2).is_ok(),
"Should be able to read block 2"
);
}
#[test]
fn test_segment_scan_iterator_exclusive_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
assert!(reader.visible_key_blocks > 0, "Should have visible key blocks");
assert!(reader.visible_val_blocks > 0, "Should have visible value blocks");
}
#[test]
fn test_segment_scan_iterator_mixed_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
let block = reader.read_key_block(0);
assert!(block.is_ok(), "Should be able to read block 0");
}
#[test]
fn test_segment_scan_seeking_iterator_mixed_bounds() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
let block = reader.read_key_block(0);
assert!(block.is_ok(), "Should be able to read block 0");
}
#[test]
fn test_segment_scan_iterator_unbounded() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
let block0 = reader.read_key_block(0);
assert!(block0.is_ok(), "Should be able to read block 0");
let block = block0.unwrap();
assert!(block.num_entries() > 0, "Block should have entries");
}
#[test]
fn test_segment_scan_iterator_namespace_filtering() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
assert!(reader.visible_key_blocks > 0, "Expected blocks in segment");
}
#[test]
fn test_segment_scan_iterator_non_existent_keys() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
let non_existent_key = &[
0u8, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b't', b'_', b'f', b'o', b'u', b'n', b'd',
][..];
assert!(
!reader.key_index.write().may_contain(non_existent_key),
"Bloom filter should not contain non-existent key"
);
assert!(
reader
.key_index
.write()
.get_block(non_existent_key)
.is_none(),
"Should not find block for non-existent key"
);
}
#[test]
fn test_is_in_range() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (reader, _dir) = create_scan_test_segment(key_index, val_index);
assert!(
reader.key_index.write().block_count() > 0,
"Expected blocks in key index"
);
}
#[test]
fn test_segment_scan_iterator_errors() {
let seed = 42i64;
let key_index = Index::new(1, seed);
let val_index = Index::new(2, seed);
let (mut reader, _dir) = create_scan_test_segment(key_index, val_index);
reader.visible_key_blocks = 0;
let iter = reader.scan(Bound::Unbounded, Bound::Unbounded);
let results: Vec<_> = iter.collect();
assert_eq!(
results.len(),
0,
"Expected no results when no blocks are visible"
);
}
}