use super::format::{Compression, FileTrailer};
use super::{AnyFactories, Deserializer, Factories};
use crate::dynamic::{DynVec, WeightTrait};
use crate::storage::buffer_cache::CacheAccess;
use crate::storage::file::format::{BatchMetadata, FilterBlock};
use crate::storage::tracking_bloom_filter::TrackingBloomFilter;
use crate::storage::{
backend::StorageError,
buffer_cache::{BufferCache, FBuf},
file::format::{
DataBlockHeader, FileTrailerColumn, IndexBlockHeader, NodeType, VERSION_NUMBER, Varint,
},
file::item::ArchivedItem,
};
use crate::{
dynamic::{DataTrait, DeserializeDyn, Factory},
storage::{
backend::{BlockLocation, FileReader, InvalidBlockLocation, StorageBackend},
buffer_cache::{AtomicCacheStats, CacheStats},
},
};
use binrw::{
BinRead,
io::{self},
};
use crc32c::crc32c;
use dyn_clone::clone_box;
use feldera_buffer_cache::CacheEntry;
use feldera_storage::StoragePath;
use feldera_storage::file::FileId;
use size_of::SizeOf;
use smallvec::SmallVec;
use snap::raw::{Decoder, decompress_len};
use std::mem::replace;
use std::ops::Index;
use std::time::Instant;
use std::{
cmp::{
Ordering::{self, *},
max, min,
},
fmt::{Debug, Formatter, Result as FmtResult},
marker::PhantomData,
mem::size_of,
ops::{Bound, Range, RangeBounds},
sync::Arc,
};
use thiserror::Error as ThisError;
use tracing::info;
mod bulk_rows;
pub use bulk_rows::BulkRows;
mod fetch_zset;
pub use fetch_zset::FetchZSet;
mod fetch_indexed_zset;
pub use fetch_indexed_zset::FetchIndexedZSet;
#[derive(ThisError, Debug)]
pub enum Error {
#[error("Corrupt layer file: {0}")]
Corruption(#[from] CorruptionError),
#[error("Error accessing storage: {0}")]
Storage(#[from] StorageError),
#[error("File has {actual} column(s) but should have {expected}.")]
WrongNumberOfColumns {
actual: usize,
expected: usize,
},
#[error("The requested operation is not supported.")]
Unsupported,
}
#[derive(ThisError, Clone, Debug)]
pub enum CorruptionError {
#[error("File size {0} must be a positive multiple of 512")]
InvalidFileSize(
u64,
),
#[error(
"Block ({location}) with magic {magic:?} has invalid checksum {checksum:#x} (expected {computed_checksum:#x})"
)]
InvalidChecksum {
location: BlockLocation,
magic: [u8; 4],
checksum: u32,
computed_checksum: u32,
},
#[error("File has invalid version {version} (expected {expected_version})")]
InvalidVersion {
version: u32,
expected_version: u32,
},
#[error("File uses unsupported incompatible features {0:#x}")]
UnsupportedIncompatibleFeatures(
u64,
),
#[error("Binary read/write error reading {block_type} block ({location}): {inner}")]
Binrw {
location: BlockLocation,
block_type: &'static str,
inner: String,
},
#[error(
"{count}-element array of {each}-byte elements starting at offset {offset} within block overflows {block_size}-byte block"
)]
InvalidArray {
block_size: usize,
offset: usize,
count: usize,
each: usize,
},
#[error(
"{count} strides of {stride} bytes each starting at offset {start} overflows {block_size}-byte block"
)]
InvalidStride {
block_size: usize,
start: usize,
stride: usize,
count: usize,
},
#[error("Index nesting depth {depth} exceeds maximum ({max_depth}).")]
TooDeep {
depth: usize,
max_depth: usize,
},
#[error("File has no columns.")]
NoColumns,
#[error("Index block ({0}) is empty")]
EmptyIndex(BlockLocation),
#[error("Data block ({location}) contains rows {rows:?} but {expected_rows:?} were expected.")]
DataBlockWrongRows {
location: BlockLocation,
rows: Range<u64>,
expected_rows: Range<u64>,
},
#[error("Index block ({location}) contains {n_rows} rows but {expected_rows} were expected.")]
IndexBlockWrongNumberOfRows {
location: BlockLocation,
n_rows: u64,
expected_rows: u64,
},
#[error("Index block ({location}) has nonmonotonic row totals ({prev} then {next}).")]
NonmonotonicIndex {
location: BlockLocation,
prev: u64,
next: u64,
},
#[error(
"Column {column} has fewer rows ({this_n_rows}) than the previous column ({prev_n_rows})."
)]
DecreasingRowCount {
column: usize,
this_n_rows: u64,
prev_n_rows: u64,
},
#[error("Unexpectedly missing row {0} in column 1 (or later)")]
MissingRow(
u64,
),
#[error(
"Index block ({location}) has child {index} with invalid offset {child_offset} or size {child_size}."
)]
InvalidChild {
location: BlockLocation,
index: usize,
child_offset: u64,
child_size: usize,
},
#[error(
"File trailer column specification has invalid node offset {node_offset} or size {node_size}."
)]
InvalidColumnRoot {
node_offset: u64,
node_size: u32,
},
#[error("Row group {index} in data block ({location}) has invalid row range {start}..{end}.")]
InvalidRowGroup {
location: BlockLocation,
index: usize,
start: u64,
end: u64,
},
#[error("Block ({0}) is wrong type of block.")]
BadBlockType(BlockLocation),
#[error(
"Compressed block ({location}) claims compressed length {compressed_len} but at most {max_compressed_len} would fit."
)]
BadCompressedLen {
location: BlockLocation,
compressed_len: usize,
max_compressed_len: usize,
},
#[error(
"Compressed block ({location}) decompressed to {length} bytes instead of the expected {expected_length} bytes"
)]
UnexpectedDecompressionLength {
location: BlockLocation,
length: usize,
expected_length: usize,
},
#[error("Compressed block ({location}) failed Snappy decompression: {error}.")]
Snappy {
location: BlockLocation,
error: snap::Error,
},
#[error("Multiple paths to block ({0}).")]
MultiplePaths(BlockLocation),
#[error("Invalid file block location ({0}).")]
InvalidFilterLocation(InvalidBlockLocation),
}
#[derive(Clone)]
struct VarintReader {
varint: Varint,
start: usize,
count: usize,
}
impl VarintReader {
fn new(buf: &FBuf, varint: Varint, start: usize, count: usize) -> Result<Self, Error> {
let block_size = buf.len();
match varint
.len()
.checked_mul(count)
.and_then(|len| len.checked_add(start))
{
Some(end) if end <= block_size => Ok(Self {
varint,
start,
count,
}),
_ => Err(CorruptionError::InvalidArray {
block_size,
offset: start,
count,
each: varint.len(),
}
.into()),
}
}
fn new_opt(
buf: &FBuf,
varint: Option<Varint>,
start: usize,
count: usize,
) -> Result<Option<Self>, Error> {
varint
.map(|varint| VarintReader::new(buf, varint, start, count))
.transpose()
}
fn get(&self, src: &FBuf, index: usize) -> u64 {
debug_assert!(index < self.count);
self.varint.get(src, self.start + self.varint.len() * index)
}
}
#[derive(Clone)]
struct StrideReader {
start: usize,
stride: usize,
count: usize,
}
impl StrideReader {
fn new(raw: &FBuf, start: usize, stride: usize, count: usize) -> Result<Self, Error> {
let block_size = raw.len();
if count > 0
&& let Some(last) = stride
.checked_mul(count - 1)
.and_then(|len| len.checked_add(start))
&& last < block_size
{
return Ok(Self {
start,
stride,
count,
});
}
Err(CorruptionError::InvalidStride {
block_size,
start,
stride,
count,
}
.into())
}
fn get(&self, index: usize) -> usize {
debug_assert!(index < self.count);
self.start + index * self.stride
}
}
#[derive(Clone)]
enum ValueMapReader {
VarintMap(VarintReader),
StrideMap(StrideReader),
}
impl ValueMapReader {
fn new(raw: &FBuf, varint: Option<Varint>, offset: u32, n_values: u32) -> Result<Self, Error> {
let offset = offset as usize;
let n_values = n_values as usize;
if let Some(varint) = varint {
Ok(Self::VarintMap(VarintReader::new(
raw, varint, offset, n_values,
)?))
} else {
let stride_map = VarintReader::new(raw, Varint::B32, offset, 2)?;
let start = stride_map.get(raw, 0) as usize;
let stride = stride_map.get(raw, 1) as usize;
Ok(Self::StrideMap(StrideReader::new(
raw, start, stride, n_values,
)?))
}
}
fn len(&self) -> usize {
match self {
ValueMapReader::VarintMap(varint_reader) => varint_reader.count,
ValueMapReader::StrideMap(stride_reader) => stride_reader.count,
}
}
fn get(&self, raw: &FBuf, index: usize) -> usize {
match self {
ValueMapReader::VarintMap(varint_reader) => varint_reader.get(raw, index) as usize,
ValueMapReader::StrideMap(stride_reader) => stride_reader.get(index),
}
}
}
pub(super) struct DataBlock<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
location: BlockLocation,
raw: Arc<FBuf>,
value_map: ValueMapReader,
row_groups: Option<VarintReader>,
first_row: u64,
version: u32,
_phantom: PhantomData<fn(&K, &A)>,
}
impl<K, A> CacheEntry for DataBlock<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn cost(&self) -> usize {
size_of::<Self>() + self.raw.capacity()
}
}
impl<K, A> DataBlock<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
pub(super) fn from_raw(
raw: Arc<FBuf>,
location: BlockLocation,
first_row: u64,
version: u32,
) -> Result<Self, Error> {
let header =
DataBlockHeader::read_le(&mut io::Cursor::new(raw.as_slice())).map_err(|e| {
Error::Corruption(CorruptionError::Binrw {
location,
block_type: "data",
inner: e.to_string(),
})
})?;
Ok(Self {
location,
value_map: ValueMapReader::new(
&raw,
header.value_map_varint,
header.value_map_ofs,
header.n_values,
)?,
row_groups: VarintReader::new_opt(
&raw,
header.row_group_varint,
header.row_groups_ofs as usize,
header.n_values as usize + 1,
)?,
raw,
first_row,
version,
_phantom: PhantomData,
})
}
pub(super) fn from_raw_with_cache(
raw: Arc<FBuf>,
node: &TreeNode,
cache: &BufferCache,
file_id: FileId,
version: u32,
) -> Result<Arc<Self>, Error> {
let block = Arc::new(Self::from_raw(
raw,
node.location,
node.rows.start,
version,
)?);
cache.insert(file_id, node.location.offset, block.clone());
Ok(block)
}
fn from_cache_entry(
cache_entry: Arc<dyn CacheEntry>,
location: BlockLocation,
) -> Result<Arc<Self>, Error> {
cache_entry
.downcast()
.ok_or(Error::Corruption(CorruptionError::BadBlockType(location)))
}
fn new(file: &ImmutableFileRef, node: &TreeNode) -> Result<Arc<Self>, Error> {
let start = Instant::now();
let cache = file.cache();
#[allow(clippy::borrow_deref_ref)]
let (access, entry) = match cache.get(&*file.file_handle, node.location) {
Some(entry) => (
CacheAccess::Hit,
Self::from_cache_entry(entry, node.location)?,
),
None => {
let block = file.read_block(node.location)?;
let entry = Self::from_raw_with_cache(
block,
node,
&cache,
file.file_handle.file_id(),
file.version,
)?;
(CacheAccess::Miss, entry)
}
};
file.stats.record(access, start.elapsed(), node.location);
if entry.rows() != node.rows {
return Err(CorruptionError::DataBlockWrongRows {
location: node.location,
rows: entry.rows(),
expected_rows: node.rows.clone(),
}
.into());
}
Ok(entry)
}
fn n_values(&self) -> usize {
self.value_map.len()
}
fn rows(&self) -> Range<u64> {
self.first_row..(self.first_row + self.n_values() as u64)
}
fn row_group(&self, index: usize) -> Result<Range<u64>, Error> {
let row_groups = self.row_groups.as_ref().unwrap();
let start = row_groups.get(&self.raw, index);
let end = row_groups.get(&self.raw, index + 1);
if start < end {
Ok(start..end)
} else {
Err(CorruptionError::InvalidRowGroup {
location: self.location,
index,
start,
end,
}
.into())
}
}
fn row_group_for_row(&self, row: u64) -> Result<Range<u64>, Error> {
let index = (row - self.first_row) as usize;
self.row_group(index)
}
unsafe fn archived_item(
&self,
factories: &Factories<K, A>,
index: usize,
) -> &dyn ArchivedItem<'_, K, A> {
unsafe {
factories
.item_factory
.archived_value(&self.raw, self.value_map.get(&self.raw, index))
}
}
unsafe fn archived_item_for_row(
&self,
factories: &Factories<K, A>,
row: u64,
) -> &dyn ArchivedItem<'_, K, A> {
unsafe {
let index = (row - self.first_row) as usize;
self.archived_item(factories, index)
}
}
unsafe fn item(&self, factories: &Factories<K, A>, index: usize, item: (&mut K, &mut A)) {
unsafe {
let archived_item = self.archived_item(factories, index);
let mut deserializer = Deserializer::new(self.version);
DeserializeDyn::deserialize_with(archived_item.fst(), item.0, &mut deserializer);
let mut deserializer = Deserializer::new(self.version);
DeserializeDyn::deserialize_with(archived_item.snd(), item.1, &mut deserializer);
}
}
unsafe fn item_for_row(&self, factories: &Factories<K, A>, row: u64, item: (&mut K, &mut A)) {
unsafe {
let index = (row - self.first_row) as usize;
self.item(factories, index, item)
}
}
unsafe fn key(&self, factories: &Factories<K, A>, index: usize, key: &mut K) {
unsafe {
let item = self.archived_item(factories, index);
let mut deserializer = Deserializer::new(self.version);
DeserializeDyn::deserialize_with(item.fst(), key, &mut deserializer)
}
}
unsafe fn key_range(&self, factories: &Factories<K, A>, min: &mut K, max: &mut K) {
unsafe {
self.key(factories, 0, min);
self.key(factories, self.n_values() - 1, max);
}
}
unsafe fn aux(&self, factories: &Factories<K, A>, index: usize, aux: &mut A) {
unsafe {
let item = self.archived_item(factories, index);
let mut deserializer = Deserializer::new(self.version);
DeserializeDyn::deserialize_with(item.snd(), aux, &mut deserializer)
}
}
unsafe fn key_for_row(&self, factories: &Factories<K, A>, row: u64, key: &mut K) {
unsafe {
let index = (row - self.first_row) as usize;
self.key(factories, index, key)
}
}
unsafe fn aux_for_row(&self, factories: &Factories<K, A>, row: u64, aux: &mut A) {
unsafe {
let index = (row - self.first_row) as usize;
self.aux(factories, index, aux)
}
}
unsafe fn find_best_match<C>(
&self,
factories: &Factories<K, A>,
target_rows: &Range<u64>,
compare: &C,
bias: Ordering,
key: &mut K,
) -> Option<usize>
where
C: Fn(&K) -> Ordering,
{
unsafe {
let block_rows = self.rows();
if block_rows.start >= target_rows.end || block_rows.end <= target_rows.start {
return None;
}
let mut best = None;
let mut start = (max(block_rows.start, target_rows.start) - self.first_row) as usize;
let mut end = (min(block_rows.end, target_rows.end) - self.first_row) as usize;
let mut mid = 0;
while start < end {
mid = start.midpoint(end);
self.key(factories, mid, key);
let cmp = compare(key);
match cmp {
Less => end = mid,
Equal => return Some(mid),
Greater => start = mid + 1,
};
if cmp == bias {
best = Some(mid);
}
}
if let Some(best) = best
&& best != mid
{
self.key(factories, best, key);
}
best
}
}
unsafe fn find_with_cache<const N: usize>(
&self,
factories: &Factories<K, A>,
key_stack: &mut DynVec<K>,
index_stack: &mut SmallVec<[usize; N]>,
target: &K,
) -> bool {
unsafe {
let mut start = 0;
let mut end = self.n_values();
let mut i = 0;
while start < end {
let mid = start.midpoint(end);
if index_stack.get(i) != Some(&mid) {
index_stack.truncate(i);
index_stack.push(mid);
key_stack.truncate(i);
key_stack.push_with(&mut |key| self.key(factories, mid, key));
};
match target.cmp(&key_stack[i]) {
Less => end = mid,
Equal => {
index_stack.truncate(i + 1);
key_stack.truncate(i + 1);
return true;
}
Greater => start = mid + 1,
};
i += 1;
}
false
}
}
}
fn range_compare<T>(range: &Range<T>, target: T) -> Ordering
where
T: Ord,
{
if target < range.start {
Greater
} else if target >= range.end {
Less
} else {
Equal
}
}
#[derive(Clone, Debug, SizeOf)]
pub(super) struct TreeNode {
pub location: BlockLocation,
pub node_type: NodeType,
#[size_of(skip)]
pub rows: Range<u64>,
}
impl TreeNode {
fn read<K, A>(&self, file: &ImmutableFileRef) -> Result<TreeBlock<K, A>, Error>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
match self.node_type {
NodeType::Data => Ok(TreeBlock::Data(DataBlock::new(file, self)?)),
NodeType::Index => Ok(TreeBlock::Index(IndexBlock::new(file, self)?)),
}
}
fn key_range<K, A>(
&self,
file: &ImmutableFileRef,
factories: &Factories<K, A>,
) -> Result<(Box<K>, Box<K>), Error>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
let key_factory = factories.key_factory;
let mut min = key_factory.default_box();
let mut max = key_factory.default_box();
match self.read::<K, A>(file)? {
TreeBlock::Data(data_block) => unsafe {
data_block.key_range(factories, min.as_mut(), max.as_mut());
},
TreeBlock::Index(index_block) => unsafe {
index_block.key_range(min.as_mut(), max.as_mut());
},
}
Ok((min, max))
}
}
enum TreeBlock<K: DataTrait + ?Sized, A: DataTrait + ?Sized> {
Data(Arc<DataBlock<K, A>>),
Index(Arc<IndexBlock<K>>),
}
impl<K, A> TreeBlock<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn from_cache(
node: &TreeNode,
cache: &BufferCache,
file: &dyn FileReader,
) -> Result<Option<Self>, Error> {
match cache.get(file, node.location) {
Some(cache_entry) => match node.node_type {
NodeType::Data => Ok(Some(Self::Data(DataBlock::from_cache_entry(
cache_entry,
node.location,
)?))),
NodeType::Index => Ok(Some(Self::Index(IndexBlock::from_cache_entry(
cache_entry,
node.location,
)?))),
},
None => Ok(None),
}
}
pub(super) fn from_raw_with_cache(
raw: Arc<FBuf>,
node: &TreeNode,
cache: &BufferCache,
file_id: FileId,
version: u32,
) -> Result<Self, Error> {
match node.node_type {
NodeType::Data => Ok(Self::Data(DataBlock::from_raw_with_cache(
raw, node, cache, file_id, version,
)?)),
NodeType::Index => Ok(Self::Index(IndexBlock::from_raw_with_cache(
raw, node, cache, file_id, version,
)?)),
}
}
fn lookup_row(&self, row: u64) -> Result<Option<TreeNode>, Error> {
match self {
Self::Data(data_block) => {
if data_block.rows().contains(&row) {
return Ok(None);
}
}
Self::Index(index_block) => {
if index_block.rows().contains(&row) {
return Ok(Some(index_block.get_child_by_row(row)?));
}
}
}
Err(CorruptionError::MissingRow(row).into())
}
}
pub(super) struct IndexBlock<K>
where
K: DataTrait + ?Sized,
{
location: BlockLocation,
raw: Arc<FBuf>,
child_type: NodeType,
bounds: VarintReader,
row_totals: VarintReader,
child_offsets: VarintReader,
child_sizes: VarintReader,
first_row: u64,
version: u32,
_phantom: PhantomData<K>,
}
impl<K> CacheEntry for IndexBlock<K>
where
K: DataTrait + ?Sized,
{
fn cost(&self) -> usize {
size_of::<Self>() + self.raw.capacity()
}
}
impl<K> IndexBlock<K>
where
K: DataTrait + ?Sized,
{
pub(super) fn from_raw(
raw: Arc<FBuf>,
location: BlockLocation,
first_row: u64,
version: u32,
) -> Result<Self, Error> {
let header =
IndexBlockHeader::read_le(&mut io::Cursor::new(raw.as_slice())).map_err(|e| {
Error::Corruption(CorruptionError::Binrw {
location,
block_type: "index",
inner: e.to_string(),
})
})?;
if header.n_children == 0 {
return Err(CorruptionError::EmptyIndex(location).into());
}
let row_totals = VarintReader::new(
&raw,
header.row_total_varint,
header.row_totals_offset as usize,
header.n_children as usize,
)?;
for i in 1..header.n_children as usize {
let prev = row_totals.get(&raw, i - 1);
let next = row_totals.get(&raw, i);
if prev >= next {
return Err(CorruptionError::NonmonotonicIndex {
location,
prev,
next,
}
.into());
}
}
Ok(Self {
location,
child_type: header.child_type,
bounds: VarintReader::new(
&raw,
header.bound_map_varint,
header.bound_map_offset as usize,
header.n_children as usize * 2,
)?,
row_totals,
child_offsets: VarintReader::new(
&raw,
header.child_offset_varint,
header.child_offsets_offset as usize,
header.n_children as usize,
)?,
child_sizes: VarintReader::new(
&raw,
header.child_size_varint,
header.child_sizes_offset as usize,
header.n_children as usize,
)?,
raw,
first_row,
version,
_phantom: PhantomData,
})
}
pub(super) fn from_raw_with_cache(
raw: Arc<FBuf>,
node: &TreeNode,
cache: &BufferCache,
file_id: FileId,
version: u32,
) -> Result<Arc<Self>, Error> {
let block = Arc::new(Self::from_raw(
raw,
node.location,
node.rows.start,
version,
)?);
cache.insert(file_id, node.location.offset, block.clone());
Ok(block)
}
fn from_cache_entry(
cache_entry: Arc<dyn CacheEntry>,
location: BlockLocation,
) -> Result<Arc<Self>, Error> {
cache_entry
.downcast()
.ok_or(Error::Corruption(CorruptionError::BadBlockType(location)))
}
fn new(file: &ImmutableFileRef, node: &TreeNode) -> Result<Arc<Self>, Error> {
let start = Instant::now();
let cache = file.cache();
let first_row = node.rows.start;
#[allow(clippy::borrow_deref_ref)]
let (access, entry) = match cache.get(&*file.file_handle, node.location) {
Some(entry) => {
let entry = Self::from_cache_entry(entry, node.location)?;
if entry.first_row != first_row {
return Err(Error::Corruption(CorruptionError::MultiplePaths(
node.location,
)));
}
(CacheAccess::Hit, entry)
}
None => {
let block = file.read_block(node.location)?;
let entry = Self::from_raw_with_cache(
block,
node,
&cache,
file.file_handle.file_id(),
file.version,
)?;
(CacheAccess::Miss, entry)
}
};
file.stats.record(access, start.elapsed(), node.location);
let expected_rows = node.rows.end - node.rows.start;
let n_rows = entry.row_totals.get(&entry.raw, entry.row_totals.count - 1);
if n_rows != expected_rows {
return Err(CorruptionError::IndexBlockWrongNumberOfRows {
location: node.location,
n_rows,
expected_rows,
}
.into());
}
Ok(entry)
}
fn rows(&self) -> Range<u64> {
self.first_row..self.first_row + self.row_totals.get(&self.raw, self.row_totals.count - 1)
}
fn get_child_location(&self, index: usize) -> Result<BlockLocation, Error> {
let offset = self.child_offsets.get(&self.raw, index) << 9;
let size = self.child_sizes.get(&self.raw, index) << 9;
BlockLocation::new(offset, size as usize).map_err(|error: InvalidBlockLocation| {
Error::Corruption(CorruptionError::InvalidChild {
location: self.location,
index,
child_offset: error.offset,
child_size: error.size,
})
})
}
fn get_child(&self, index: usize) -> Result<TreeNode, Error> {
Ok(TreeNode {
location: self.get_child_location(index)?,
node_type: self.child_type,
rows: self.get_rows(index),
})
}
fn get_child_by_row(&self, row: u64) -> Result<TreeNode, Error> {
self.get_child(self.find_row(row)?)
}
fn get_rows(&self, index: usize) -> Range<u64> {
let low = if index == 0 {
0
} else {
self.row_totals.get(&self.raw, index - 1)
};
let high = self.row_totals.get(&self.raw, index);
(self.first_row + low)..(self.first_row + high)
}
fn get_row_bound(&self, index: usize) -> u64 {
if index == 0 {
0
} else if index % 2 == 1 {
self.row_totals.get(&self.raw, index / 2) - 1
} else {
self.row_totals.get(&self.raw, index / 2 - 1)
}
}
fn find_row(&self, row: u64) -> Result<usize, Error> {
let mut indexes = 0..self.n_children();
while !indexes.is_empty() {
let mid = indexes.start.midpoint(indexes.end);
let rows = self.get_rows(mid);
if row < rows.start {
indexes.end = mid;
} else if row >= rows.end {
indexes.start = mid + 1;
} else {
return Ok(mid);
}
}
Err(CorruptionError::MissingRow(row).into())
}
unsafe fn get_bound(&self, index: usize, bound: &mut K) {
unsafe {
let offset = self.bounds.get(&self.raw, index) as usize;
let mut deserializer = Deserializer::new(self.version);
bound.deserialize_from_bytes_with(&self.raw, offset, &mut deserializer)
}
}
unsafe fn key_range(&self, min: &mut K, max: &mut K) {
unsafe {
self.get_bound(0, min);
self.max_bound(max);
}
}
unsafe fn max_bound(&self, bound: &mut K) {
unsafe { self.get_bound(self.last_bound_index(), bound) }
}
unsafe fn find_best_match<C>(
&self,
target_rows: &Range<u64>,
compare: &C,
bias: Ordering,
bound: &mut K,
) -> Option<usize>
where
C: Fn(&K) -> Ordering,
{
unsafe {
let mut start = 0;
let mut end = self.n_children() * 2;
let mut result = None;
while start < end {
let mid = start.midpoint(end);
let row = self.get_row_bound(mid) + self.first_row;
let cmp = match range_compare(target_rows, row) {
Equal => {
self.get_bound(mid, bound);
let cmp = compare(bound);
if cmp == Equal {
return Some(mid / 2);
}
cmp
}
cmp => cmp,
};
if cmp == Less {
end = mid
} else {
start = mid + 1
};
if bias == cmp {
result = Some(mid / 2);
}
}
result
}
}
unsafe fn find_next<I>(
&self,
targets: &I,
mut target_indexes: Range<usize>,
tmp_key: &mut K,
start: &mut usize,
) -> Option<(usize, usize)>
where
I: Index<usize, Output = K> + ?Sized,
{
unsafe {
let start_index = target_indexes.next().unwrap();
let mut end = self.n_children();
while *start < end {
let mid = start.midpoint(end);
self.get_bound(mid * 2, tmp_key);
if &targets[start_index] < tmp_key {
end = mid;
} else {
*start = mid + 1;
self.get_bound(mid * 2 + 1, tmp_key);
if &targets[start_index] <= tmp_key {
let n = 1 + target_indexes
.take_while(|i| &targets[*i] <= tmp_key)
.count();
return Some((mid, n));
}
}
}
None
}
}
fn n_children(&self) -> usize {
self.child_offsets.count
}
fn last_bound_index(&self) -> usize {
self.n_children() * 2 - 1
}
unsafe fn compare_max<C>(&self, key_factory: &dyn Factory<K>, compare: &C) -> Ordering
where
C: Fn(&K) -> Ordering,
{
unsafe {
let mut ordering = Equal;
key_factory.with(&mut |key| {
self.max_bound(key);
ordering = compare(key);
});
ordering
}
}
}
impl<K> Debug for IndexBlock<K>
where
K: DataTrait + ?Sized + Debug,
{
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(
f,
"IndexBlock {{ first_row: {}, child_type: {:?}, children = {{",
self.first_row, self.child_type
)?;
for i in 0..self.n_children() {
if i > 0 {
write!(f, ",")?;
}
write!(
f,
" [{i}] = {{ rows: {:?}, location: {:?} }}",
self.get_rows(i),
self.get_child_location(i),
)?;
}
write!(f, " }}")
}
}
impl CacheEntry for FileTrailer {
fn cost(&self) -> usize {
size_of::<FileTrailer>()
}
}
impl FileTrailer {
fn from_raw(raw: Arc<FBuf>, location: BlockLocation) -> Result<Self, Error> {
Self::read_le(&mut io::Cursor::new(raw.as_slice())).map_err(|e| {
Error::Corruption(CorruptionError::Binrw {
location,
block_type: "trailer",
inner: e.to_string(),
})
})
}
fn new(
cache: fn() -> Option<Arc<BufferCache>>,
file_handle: &dyn FileReader,
location: BlockLocation,
stats: &AtomicCacheStats,
) -> Result<Arc<FileTrailer>, Error> {
let start = Instant::now();
let cache = cache().expect("Should have a buffer cache");
#[allow(clippy::borrow_deref_ref)]
let (access, entry) = match cache.get(&*file_handle, location) {
Some(entry) => {
let entry = entry
.downcast()
.ok_or(Error::Corruption(CorruptionError::BadBlockType(location)))?;
(CacheAccess::Hit, entry)
}
None => {
let block = file_handle.read_block(location)?;
let entry = Arc::new(Self::from_raw(block, location)?);
cache.insert(file_handle.file_id(), location.offset, entry.clone());
(CacheAccess::Miss, entry)
}
};
stats.record(access, start.elapsed(), location);
Ok(entry)
}
}
#[derive(Debug, SizeOf)]
struct Column {
root: Option<TreeNode>,
#[size_of(skip)]
factories: AnyFactories,
n_rows: u64,
}
impl FilterBlock {
fn new(file_handle: &dyn FileReader, location: BlockLocation) -> Result<Self, Error> {
let block = file_handle.read_block(location)?;
Self::read_le(&mut io::Cursor::new(block.as_slice())).map_err(|e| {
Error::Corruption(CorruptionError::Binrw {
location,
block_type: "filter",
inner: e.to_string(),
})
})
}
}
impl Column {
fn new(factories: &AnyFactories, info: &FileTrailerColumn) -> Result<Self, Error> {
let FileTrailerColumn {
node_offset,
node_size,
node_type,
n_rows,
} = *info;
let root = if n_rows != 0 {
let location = match BlockLocation::new(node_offset, node_size as usize) {
Ok(location) => location,
Err(_) => {
return Err(Error::Corruption(CorruptionError::InvalidColumnRoot {
node_offset,
node_size,
}));
}
};
Some(TreeNode {
location,
node_type,
rows: 0..n_rows,
})
} else {
None
};
Ok(Self {
root,
n_rows,
factories: factories.clone(),
})
}
}
#[derive(SizeOf)]
struct ImmutableFileRef {
cache: fn() -> Option<Arc<BufferCache>>,
#[size_of(skip)]
file_handle: Arc<dyn FileReader>,
compression: Option<Compression>,
stats: AtomicCacheStats,
version: u32,
}
impl Debug for ImmutableFileRef {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "ImmutableFileRef({:?})", &self.file_handle)
}
}
impl Drop for ImmutableFileRef {
fn drop(&mut self) {
if Arc::strong_count(&self.file_handle) == 1
&& let Some(cache) = (self.cache)()
{
cache.evict(&*self.file_handle);
}
}
}
impl ImmutableFileRef {
fn new(
cache: fn() -> Option<Arc<BufferCache>>,
file_handle: Arc<dyn FileReader>,
compression: Option<Compression>,
stats: AtomicCacheStats,
version: u32,
) -> Self {
Self {
cache,
file_handle,
compression,
stats,
version,
}
}
fn cache(&self) -> Arc<BufferCache> {
(self.cache)().expect("Should have a buffer cache")
}
#[cfg(test)]
pub fn evict(&self) {
self.cache().evict(&*self.file_handle);
}
pub fn read_block(&self, location: BlockLocation) -> Result<Arc<FBuf>, Error> {
decompress(
self.compression,
location,
self.file_handle.read_block(location)?,
)
}
}
fn decompress(
compression: Option<Compression>,
location: BlockLocation,
raw: Arc<FBuf>,
) -> Result<Arc<FBuf>, Error> {
let raw = if let Some(compression) = compression {
let compressed_len = u32::from_le_bytes(raw[..4].try_into().unwrap()) as usize;
let Some(compressed) = raw[4..].get(..compressed_len) else {
return Err(CorruptionError::BadCompressedLen {
location,
compressed_len,
max_compressed_len: raw.len() - 4,
}
.into());
};
match compression {
Compression::Snappy => {
let decompressed_len = decompress_len(compressed).map_err(|error| {
Error::Corruption(CorruptionError::Snappy { location, error })
})?;
let mut decompressed = FBuf::with_capacity(decompressed_len);
decompressed.resize(decompressed_len, 0);
match Decoder::new().decompress(compressed, decompressed.as_mut_slice()) {
Ok(n) if n == decompressed_len => {}
Ok(n) => {
return Err(CorruptionError::UnexpectedDecompressionLength {
location,
length: n,
expected_length: decompressed_len,
}
.into());
}
Err(error) => return Err(CorruptionError::Snappy { location, error }.into()),
}
Arc::new(decompressed)
}
}
} else {
raw
};
let computed_checksum = crc32c(&raw[4..]);
let checksum = u32::from_le_bytes(raw[..4].try_into().unwrap());
if checksum != computed_checksum {
return Err(CorruptionError::InvalidChecksum {
location,
magic: raw[4..8].try_into().unwrap(),
checksum,
computed_checksum,
}
.into());
}
Ok(raw)
}
pub trait ColumnSpec {
fn n_columns() -> usize;
}
impl ColumnSpec for () {
fn n_columns() -> usize {
0
}
}
impl<K, A, N> ColumnSpec for (&'static K, &'static A, N)
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
N: ColumnSpec,
{
fn n_columns() -> usize {
1 + N::n_columns()
}
}
#[derive(Debug)]
pub struct Reader<T> {
file: ImmutableFileRef,
columns: Vec<Column>,
pub(crate) metadata: BatchMetadata,
_phantom: PhantomData<fn() -> T>,
}
impl<T> SizeOf for Reader<T>
where
T: ColumnSpec,
{
fn size_of_children(&self, context: &mut size_of::Context) {
self.file.size_of_with_context(context);
self.columns.size_of_with_context(context);
}
}
impl<T> Reader<T>
where
T: ColumnSpec,
{
pub(crate) fn new(
factories: &[&AnyFactories],
cache: fn() -> Option<Arc<BufferCache>>,
file: Arc<dyn FileReader>,
) -> Result<Self, Error> {
let (reader, _membership_filter) = Self::new_with_filter(factories, cache, file, None)?;
Ok(reader)
}
pub(crate) fn new_with_filter(
factories: &[&AnyFactories],
cache: fn() -> Option<Arc<BufferCache>>,
file: Arc<dyn FileReader>,
membership_filter: Option<TrackingBloomFilter>,
) -> Result<(Self, Option<TrackingBloomFilter>), Error> {
let file_size = file.get_size()?;
if file_size < 512 || (file_size % 512) != 0 {
return Err(CorruptionError::InvalidFileSize(file_size).into());
}
let stats = AtomicCacheStats::default();
let file_trailer = FileTrailer::new(
cache,
&*file,
BlockLocation::new(file_size - 512, 512).unwrap(),
&stats,
)?;
if file_trailer.version < VERSION_NUMBER {
return Err(CorruptionError::InvalidVersion {
version: file_trailer.version,
expected_version: VERSION_NUMBER,
}
.into());
}
if let Some(features) = file_trailer.unsupported_compatible_features() {
info!(
"{}: storage file uses unsupported compatible features {features:#x}",
file.path(),
);
}
if file_trailer.incompatible_features != 0 {
return Err(CorruptionError::UnsupportedIncompatibleFeatures(
file_trailer.incompatible_features,
)
.into());
}
assert_eq!(factories.len(), file_trailer.columns.len());
let columns: Vec<_> = file_trailer
.columns
.iter()
.zip(factories.iter())
.map(|(info, factories)| Column::new(factories, info))
.collect::<Result<_, _>>()?;
if columns.is_empty() {
return Err(CorruptionError::NoColumns.into());
}
if columns.len() != T::n_columns() {
return Err(Error::WrongNumberOfColumns {
actual: columns.len(),
expected: T::n_columns(),
});
}
for i in 1..columns.len() {
let prev_n_rows = columns[i - 1].n_rows;
let this_n_rows = columns[i].n_rows;
if this_n_rows < prev_n_rows {
return Err(CorruptionError::DecreasingRowCount {
column: i,
prev_n_rows,
this_n_rows,
}
.into());
}
}
fn read_filter_block(
file_handle: &dyn FileReader,
offset: u64,
size: usize,
) -> Result<TrackingBloomFilter, Error> {
Ok(FilterBlock::new(
file_handle,
BlockLocation::new(offset, size).map_err(|error: InvalidBlockLocation| {
Error::Corruption(CorruptionError::InvalidFilterLocation(error))
})?,
)?
.into())
}
let membership_filter = if let Some(membership_filter) = membership_filter {
Some(membership_filter)
} else if file_trailer.has_filter64() {
Some(read_filter_block(
&*file,
file_trailer.filter_offset64,
file_trailer.filter_size64 as usize,
)?)
} else if file_trailer.filter_offset != 0 {
Some(read_filter_block(
&*file,
file_trailer.filter_offset,
file_trailer.filter_size as usize,
)?)
} else {
None
};
Ok((
Self {
file: ImmutableFileRef::new(
cache,
file,
file_trailer.compression,
stats,
file_trailer.version,
),
columns,
metadata: file_trailer.metadata.clone(),
_phantom: PhantomData,
},
membership_filter,
))
}
pub fn mark_for_checkpoint(&self) {
self.file.file_handle.mark_for_checkpoint();
}
pub fn open(
factories: &[&AnyFactories],
cache: fn() -> Option<Arc<BufferCache>>,
storage_backend: &dyn StorageBackend,
path: &StoragePath,
) -> Result<Self, Error> {
Self::new(factories, cache, storage_backend.open(path)?)
}
pub(crate) fn open_with_filter(
factories: &[&AnyFactories],
cache: fn() -> Option<Arc<BufferCache>>,
storage_backend: &dyn StorageBackend,
path: &StoragePath,
) -> Result<(Self, Option<TrackingBloomFilter>), Error> {
Self::new_with_filter(factories, cache, storage_backend.open(path)?, None)
}
pub fn n_columns(&self) -> usize {
T::n_columns()
}
pub fn n_rows(&self, column: usize) -> u64 {
self.columns[column].n_rows
}
pub fn path(&self) -> &StoragePath {
self.file.file_handle.path()
}
pub fn byte_size(&self) -> Result<u64, Error> {
Ok(self.file.file_handle.get_size()?)
}
#[cfg(test)]
pub fn evict(&self) {
self.file.evict();
}
pub fn cache_stats(&self) -> CacheStats {
self.file.stats.read()
}
pub fn file_handle(&self) -> &Arc<dyn FileReader> {
&self.file.file_handle
}
pub fn metadata(&self) -> &BatchMetadata {
&self.metadata
}
}
impl<K, A, N> Reader<(&'static K, &'static A, N)>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
(&'static K, &'static A, N): ColumnSpec,
{
pub fn key_range(&self) -> Result<Option<(Box<K>, Box<K>)>, Error> {
let Some(root) = self.columns[0].root.as_ref() else {
return Ok(None);
};
let factories = self.columns[0].factories.factories::<K, A>();
Ok(Some(root.key_range(&self.file, &factories)?))
}
pub fn rows(&self) -> RowGroup<'_, K, A, N, (&'static K, &'static A, N)> {
RowGroup::new(self, 0, 0..self.columns[0].n_rows)
}
pub fn bulk_rows(&self) -> Result<BulkRows<'_, K, A, N, (&'static K, &'static A, N)>, Error> {
BulkRows::new(self, 0)
}
}
impl<K, A> Reader<(&'static K, &'static A, ())>
where
K: DataTrait + ?Sized,
A: WeightTrait + ?Sized,
{
pub(crate) fn fetch_zset<'a, 'b>(
&'a self,
keys: FilteredKeys<'b, K>,
) -> Result<FetchZSet<'a, 'b, K, A>, Error> {
FetchZSet::new(self, keys)
}
}
impl<K0, A0, K1, A1> Reader<(&'static K0, &'static A0, (&'static K1, &'static A1, ()))>
where
K0: DataTrait + ?Sized,
A0: DataTrait + ?Sized,
K1: DataTrait + ?Sized,
A1: WeightTrait + ?Sized,
{
pub(crate) fn fetch_indexed_zset<'a, 'b>(
&'a self,
keys: FilteredKeys<'b, K0>,
) -> Result<FetchIndexedZSet<'a, 'b, K0, A0, K1, A1>, Error> {
FetchIndexedZSet::new(self, keys)
}
}
pub(crate) struct FilteredKeys<'a, K>
where
K: DataTrait + ?Sized,
{
queried_keys: &'a DynVec<K>,
filter_pass_keys: Option<Vec<usize>>,
}
impl<'a, K> FilteredKeys<'a, K>
where
K: DataTrait + ?Sized,
{
pub(crate) fn all(queried_keys: &'a DynVec<K>) -> Self {
Self {
queried_keys,
filter_pass_keys: None,
}
}
pub(crate) fn with_filter_pass_keys(
queried_keys: &'a DynVec<K>,
filter_pass_keys: Option<Vec<usize>>,
) -> Self {
Self {
queried_keys,
filter_pass_keys,
}
}
pub(crate) fn len(&self) -> usize {
match &self.filter_pass_keys {
Some(filter_pass_keys) => filter_pass_keys.len(),
None => self.queried_keys.len(),
}
}
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<K> Index<usize> for FilteredKeys<'_, K>
where
K: DataTrait + ?Sized,
{
type Output = K;
fn index(&self, index: usize) -> &Self::Output {
match &self.filter_pass_keys {
Some(filter_pass_keys) => &self.queried_keys[filter_pass_keys[index]],
None => &self.queried_keys[index],
}
}
}
pub struct RowGroup<'a, K, A, N, T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
reader: &'a Reader<T>,
factories: Factories<K, A>,
column: usize,
rows: Range<u64>,
_phantom: PhantomData<fn(&K, &A, N)>,
}
impl<K, A, N, T> Clone for RowGroup<'_, K, A, N, T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn clone(&self) -> Self {
Self {
reader: self.reader,
factories: self.factories.clone(),
column: self.column,
rows: self.rows.clone(),
_phantom: PhantomData,
}
}
}
impl<K, A, N, T> Debug for RowGroup<'_, K, A, N, T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "RowGroup(column={}, rows={:?})", self.column, self.rows)
}
}
impl<'a, K, A, N, T> RowGroup<'a, K, A, N, T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
T: ColumnSpec,
{
fn new(reader: &'a Reader<T>, column: usize, rows: Range<u64>) -> Self {
Self {
reader,
factories: reader.columns[column].factories.factories(),
column,
rows,
_phantom: PhantomData,
}
}
unsafe fn cursor(&self, position: Position<K, A>) -> Cursor<'a, K, A, N, T> {
let mut key = self.factories.key_factory.default_box();
unsafe { position.key(&self.factories, &mut key) };
Cursor {
row_group: self.clone(),
key,
position,
}
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn len(&self) -> u64 {
self.rows.end - self.rows.start
}
pub unsafe fn before(&self) -> Cursor<'a, K, A, N, T> {
unsafe { self.cursor(Position::Before) }
}
pub unsafe fn after(&self) -> Cursor<'a, K, A, N, T> {
unsafe { self.cursor(Position::After { hint: None }) }
}
pub unsafe fn first(&self) -> Result<Cursor<'a, K, A, N, T>, Error> {
let position = if self.is_empty() {
Position::After { hint: None }
} else {
Position::for_row(self, self.rows.start)?
};
Ok(unsafe { self.cursor(position) })
}
pub unsafe fn first_with_hint(
&self,
hint: &Cursor<'a, K, A, N, T>,
) -> Result<Cursor<'a, K, A, N, T>, Error> {
let position = if self.is_empty() {
Position::After { hint: None }
} else {
Position::for_row_from_hint(self, &hint.position, self.rows.start)?
};
Ok(unsafe { self.cursor(position) })
}
pub unsafe fn last(&self) -> Result<Cursor<'a, K, A, N, T>, Error> {
let position = if self.is_empty() {
Position::After { hint: None }
} else {
Position::for_row(self, self.rows.end - 1)?
};
Ok(unsafe { self.cursor(position) })
}
pub fn nth(&self, row: u64) -> Result<Cursor<'a, K, A, N, T>, Error> {
let position = if row < self.len() {
Position::for_row(self, self.rows.start + row)?
} else {
Position::After { hint: None }
};
Ok(unsafe { self.cursor(position) })
}
pub fn subset<B>(&self, range: B) -> Self
where
B: RangeBounds<u64>,
{
let start = match range.start_bound() {
Bound::Included(&index) => index,
Bound::Excluded(&index) => index + 1,
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&index) => index + 1,
Bound::Excluded(&index) => index,
Bound::Unbounded => self.len(),
};
let subset = start..end;
let start = self.rows.start + subset.start;
let end = start + (subset.end - subset.start);
Self {
rows: start..end,
factories: self.factories.clone(),
..*self
}
}
}
pub trait FallibleEq {
fn equals(&self, other: &Self) -> Result<bool, Error>;
}
impl<K, A, N> FallibleEq for Reader<(&'static K, &'static A, N)>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
(&'static K, &'static A, N): ColumnSpec,
for<'a> RowGroup<'a, K, A, N, (&'static K, &'static A, N)>: FallibleEq,
{
fn equals(&self, other: &Self) -> Result<bool, Error> {
self.rows().equals(&other.rows())
}
}
impl<'a, K, A, T> FallibleEq for RowGroup<'a, K, A, (), T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
T: ColumnSpec,
{
fn equals(&self, other: &Self) -> Result<bool, Error> {
if self.len() != other.len() {
return Ok(false);
}
let mut sc: Cursor<'a, _, _, _, _> = unsafe { self.clone().first() }?;
let mut oc: Cursor<'a, _, _, _, _> = unsafe { other.clone().first() }?;
while sc.has_value() {
if unsafe { sc.archived_item() != oc.archived_item() } {
return Ok(false);
}
unsafe { sc.move_next() }?;
unsafe { oc.move_next() }?;
}
Ok(true)
}
}
impl<'a, K, A, NK, NA, NN, T> FallibleEq for RowGroup<'a, K, A, (&'static NK, &'static NA, NN), T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
NK: DataTrait + ?Sized,
NA: DataTrait + ?Sized,
T: ColumnSpec,
RowGroup<'a, NK, NA, NN, T>: FallibleEq,
{
fn equals(&self, other: &Self) -> Result<bool, Error> {
if self.len() != other.len() {
return Ok(false);
}
let mut sc = unsafe { self.clone().first() }?;
let mut oc = unsafe { other.clone().first() }?;
while sc.has_value() {
if unsafe { sc.archived_item() != oc.archived_item() } {
return Ok(false);
}
if !sc.next_column()?.equals(&oc.next_column()?)? {
return Ok(false);
}
unsafe { sc.move_next() }?;
unsafe { oc.move_next() }?;
}
Ok(true)
}
}
pub struct Cursor<'a, K, A, N, T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
row_group: RowGroup<'a, K, A, N, T>,
position: Position<K, A>,
key: Box<K>,
}
impl<K, A, N, T> Clone for Cursor<'_, K, A, N, T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn clone(&self) -> Self {
Self {
row_group: self.row_group.clone(),
key: clone_box(&self.key),
position: self.position.clone(),
}
}
}
impl<K, A, N, T> Debug for Cursor<'_, K, A, N, T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "Cursor({:?}, {:?})", self.row_group, self.position)
}
}
impl<'a, K, A, N, T> Cursor<'a, K, A, N, T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
T: ColumnSpec,
{
pub unsafe fn move_next(&mut self) -> Result<(), Error> {
self.position.next(&self.row_group)?;
unsafe { self.position.key(&self.row_group.factories, &mut self.key) };
Ok(())
}
pub unsafe fn move_prev(&mut self) -> Result<(), Error> {
self.position.prev(&self.row_group)?;
unsafe { self.position.key(&self.row_group.factories, &mut self.key) };
Ok(())
}
pub unsafe fn move_first(&mut self) -> Result<(), Error> {
self.position
.move_to_row(&self.row_group, self.row_group.rows.start)?;
unsafe { self.position.key(&self.row_group.factories, &mut self.key) };
Ok(())
}
pub unsafe fn move_last(&mut self) -> Result<(), Error> {
if !self.row_group.is_empty() {
self.position
.move_to_row(&self.row_group, self.row_group.rows.end - 1)?;
unsafe { self.position.key(&self.row_group.factories, &mut self.key) };
} else {
self.position = Position::After { hint: None };
}
Ok(())
}
pub unsafe fn move_to_row(&mut self, row: u64) -> Result<(), Error> {
if row < self.row_group.rows.end - self.row_group.rows.start {
self.position
.move_to_row(&self.row_group, self.row_group.rows.start + row)?;
unsafe { self.position.key(&self.row_group.factories, &mut self.key) };
} else {
self.position.move_after();
}
Ok(())
}
pub fn key(&self) -> Option<&K> {
self.has_value().then_some(&*self.key)
}
pub unsafe fn aux<'b>(&self, aux: &'b mut A) -> Option<&'b mut A> {
unsafe { self.position.aux(&self.row_group.factories, aux) }
}
pub unsafe fn item<'b>(&self, item: (&'b mut K, &'b mut A)) -> Option<(&'b mut K, &'b mut A)> {
unsafe { self.position.item(&self.row_group.factories, item) }
}
pub unsafe fn archived_item(&self) -> Option<&dyn ArchivedItem<'_, K, A>> {
unsafe { self.position.archived_item(&self.row_group.factories) }
}
pub fn has_value(&self) -> bool {
self.position.has_value()
}
pub fn len(&self) -> u64 {
self.row_group.len()
}
pub fn is_empty(&self) -> bool {
self.row_group.is_empty()
}
pub fn absolute_position(&self) -> u64 {
self.position.absolute_position(&self.row_group)
}
pub fn remaining_rows(&self) -> u64 {
self.position.remaining_rows(&self.row_group)
}
pub unsafe fn seek_forward_until<P>(&mut self, predicate: P) -> Result<(), Error>
where
P: Fn(&K) -> bool + Clone,
{
unsafe {
self.advance_to_first_ge(&|key| {
if predicate(key) { Less } else { Greater }
})
}
}
pub unsafe fn advance_to_value_or_larger(&mut self, target: &K) -> Result<(), Error> {
unsafe { self.advance_to_first_ge(&|key| target.cmp(key)) }
}
pub unsafe fn advance_to_first_ge<C>(&mut self, compare: &C) -> Result<(), Error>
where
C: Fn(&K) -> Ordering,
{
unsafe {
self.position
.advance_to_first_ge(&self.row_group, compare, &mut *self.key)
}
}
pub unsafe fn seek_backward_until<P>(&mut self, predicate: P) -> Result<(), Error>
where
P: Fn(&K) -> bool + Clone,
{
unsafe {
self.rewind_to_last_le(&|key| {
if !predicate(key) { Less } else { Greater }
})
}
}
pub unsafe fn rewind_to_value_or_smaller(&mut self, target: &K) -> Result<(), Error>
where
K: Ord,
{
unsafe { self.rewind_to_last_le(&|key| target.cmp(key)) }
}
pub unsafe fn rewind_to_last_le<C>(&mut self, compare: &C) -> Result<(), Error>
where
C: Fn(&K) -> Ordering,
{
let position = unsafe {
Position::best_match::<N, T, _>(&self.row_group, compare, Greater, &mut *self.key)
}?;
if position < self.position {
self.position = position;
}
unsafe { self.position.key(&self.row_group.factories, &mut self.key) };
Ok(())
}
}
impl<'a, K, A, NK, NA, NN, T> Cursor<'a, K, A, (&'static NK, &'static NA, NN), T>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
NK: DataTrait + ?Sized,
NA: DataTrait + ?Sized,
T: ColumnSpec,
{
pub fn next_column<'b>(&'b self) -> Result<RowGroup<'a, NK, NA, NN, T>, Error> {
Ok(RowGroup::new(
self.row_group.reader,
self.row_group.column + 1,
self.position.row_group()?,
))
}
}
struct Path<K: DataTrait + ?Sized, A: DataTrait + ?Sized> {
row: u64,
indexes: Vec<Arc<IndexBlock<K>>>,
data: Arc<DataBlock<K, A>>,
}
impl<K: DataTrait + ?Sized, A: DataTrait + ?Sized> PartialEq for Path<K, A> {
fn eq(&self, other: &Self) -> bool {
self.row == other.row
}
}
impl<K: DataTrait + ?Sized, A: DataTrait + ?Sized> Eq for Path<K, A> {}
impl<K: DataTrait + ?Sized, A: DataTrait + ?Sized> PartialOrd for Path<K, A> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<K: DataTrait + ?Sized, A: DataTrait + ?Sized> Ord for Path<K, A> {
fn cmp(&self, other: &Self) -> Ordering {
self.row.cmp(&other.row)
}
}
impl<K: DataTrait + ?Sized, A: DataTrait + ?Sized> Clone for Path<K, A> {
fn clone(&self) -> Self {
Self {
row: self.row,
indexes: self.indexes.clone(),
data: self.data.clone(),
}
}
}
fn push_index_block<K>(
indexes: &mut Vec<Arc<IndexBlock<K>>>,
index_block: Arc<IndexBlock<K>>,
) -> Result<(), Error>
where
K: DataTrait + ?Sized,
{
const MAX_DEPTH: usize = 64;
if indexes.len() > MAX_DEPTH {
return Err(CorruptionError::TooDeep {
depth: indexes.len(),
max_depth: MAX_DEPTH,
}
.into());
}
indexes.push(index_block);
Ok(())
}
impl<K, A> Path<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn for_row<N, T>(row_group: &RowGroup<'_, K, A, N, T>, row: u64) -> Result<Self, Error>
where
T: ColumnSpec,
{
Self::for_row_from_ancestor(
row_group,
Vec::new(),
row_group.reader.columns[row_group.column]
.root
.clone()
.unwrap(),
row,
)
}
fn for_row_from_ancestor<N, T>(
row_group: &RowGroup<'_, K, A, N, T>,
mut indexes: Vec<Arc<IndexBlock<K>>>,
mut node: TreeNode,
row: u64,
) -> Result<Self, Error>
where
T: ColumnSpec,
{
loop {
let block = node.read(&row_group.reader.file)?;
let next = block.lookup_row(row)?;
match block {
TreeBlock::Data(data) => {
return Ok(Self { row, indexes, data });
}
TreeBlock::Index(index) => {
push_index_block(&mut indexes, index)?;
}
};
node = next.unwrap();
}
}
fn for_row_from_hint<N, T>(
row_group: &RowGroup<'_, K, A, N, T>,
hint: Option<&Self>,
row: u64,
) -> Result<Self, Error>
where
T: ColumnSpec,
{
let Some(hint) = hint else {
return Self::for_row(row_group, row);
};
if hint.data.rows().contains(&row) {
return Ok(Self {
row,
..hint.clone()
});
}
for (idx, index_block) in hint.indexes.iter().enumerate().rev() {
if index_block.rows().contains(&row) {
let node = index_block.get_child_by_row(row)?;
return Self::for_row_from_ancestor(
row_group,
hint.indexes[0..=idx].to_vec(),
node,
row,
);
}
}
Err(CorruptionError::MissingRow(row).into())
}
unsafe fn key(&self, factories: &Factories<K, A>, key: &mut K) {
unsafe { self.data.key_for_row(factories, self.row, key) }
}
unsafe fn aux(&self, factories: &Factories<K, A>, aux: &mut A) {
unsafe { self.data.aux_for_row(factories, self.row, aux) }
}
unsafe fn item(&self, factories: &Factories<K, A>, item: (&mut K, &mut A)) {
unsafe { self.data.item_for_row(factories, self.row, item) }
}
unsafe fn archived_item(&self, factories: &Factories<K, A>) -> &dyn ArchivedItem<'_, K, A> {
unsafe { self.data.archived_item_for_row(factories, self.row) }
}
fn row_group(&self) -> Result<Range<u64>, Error> {
self.data.row_group_for_row(self.row)
}
fn move_to_row<N, T>(
&mut self,
row_group: &RowGroup<'_, K, A, N, T>,
row: u64,
) -> Result<(), Error>
where
T: ColumnSpec,
{
if self.data.rows().contains(&row) {
self.row = row;
} else {
*self = Self::for_row_from_hint(row_group, Some(self), row)?;
}
Ok(())
}
unsafe fn best_match<N, T, C>(
row_group: &RowGroup<'_, K, A, N, T>,
compare: &C,
bias: Ordering,
key: &mut K,
) -> Result<Option<Self>, Error>
where
T: ColumnSpec,
C: Fn(&K) -> Ordering,
{
unsafe {
let mut indexes = Vec::new();
let Some(mut node) = row_group.reader.columns[row_group.column].root.clone() else {
return Ok(None);
};
loop {
match node.read(&row_group.reader.file)? {
TreeBlock::Index(index_block) => {
let Some(child_idx) =
index_block.find_best_match(&row_group.rows, compare, bias, key)
else {
return Ok(None);
};
node = index_block.get_child(child_idx)?;
push_index_block(&mut indexes, index_block)?;
}
TreeBlock::Data(data_block) => {
return Ok(data_block
.find_best_match(
&row_group.factories,
&row_group.rows,
compare,
bias,
key,
)
.map(|child_idx| Self {
row: data_block.first_row + child_idx as u64,
indexes,
data: data_block,
}));
}
}
}
}
}
unsafe fn advance_to_first_ge<N, T, C>(
&mut self,
row_group: &RowGroup<'_, K, A, N, T>,
compare: &C,
key: &mut K,
) -> Result<bool, Error>
where
T: ColumnSpec,
C: Fn(&K) -> Ordering,
{
unsafe {
if compare(key) != Greater {
return Ok(true);
}
self.data.key_for_row(
&row_group.factories,
min(self.data.rows().end, row_group.rows.end) - 1,
key,
);
let mut rows = self.row + 1..row_group.rows.end;
if compare(key) != Greater {
let child_idx = self
.data
.find_best_match(&row_group.factories, &rows, compare, Less, key)
.unwrap();
self.row = self.data.first_row + child_idx as u64;
return Ok(true);
}
rows.start = self.data.rows().end;
while let Some(index_block) = self.indexes.pop() {
if rows.end > index_block.rows().end
&& index_block.compare_max(row_group.factories.key_factory, compare) == Greater
{
rows.start = index_block.rows().end;
continue;
}
let Some(child_idx) = index_block.find_best_match(&rows, compare, Less, key) else {
return Ok(false);
};
let mut node = index_block.get_child(child_idx)?;
push_index_block(&mut self.indexes, index_block)?;
loop {
match node.read::<K, A>(&row_group.reader.file)? {
TreeBlock::Index(index_block) => {
let Some(child_idx) =
index_block.find_best_match(&rows, compare, Less, key)
else {
return Ok(false);
};
node = index_block.get_child(child_idx)?;
push_index_block(&mut self.indexes, index_block)?;
}
TreeBlock::Data(data_block) => {
let Some(child_idx) = data_block.find_best_match(
&row_group.factories,
&rows,
compare,
Less,
key,
) else {
return Ok(false);
};
self.row = child_idx as u64 + data_block.first_row;
self.data = data_block;
return Ok(true);
}
}
}
}
Ok(false)
}
}
}
impl<K, A> Debug for Path<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "Path {{ row: {}, indexes:", self.row)?;
for index in &self.indexes {
let n = index.n_children();
match index.find_row(self.row) {
Ok(i) => {
let min_row = index.get_row_bound(i * 2);
let max_row = index.get_row_bound(i * 2 + 1);
write!(f, "\n[child {i} of {n}: rows {min_row}..={max_row}]",)?;
}
Err(_) => {
write!(f, " [unknown child of {n}]")?
}
}
}
write!(
f,
", data: [row {} of {}] }}",
self.row - self.data.first_row,
self.data.n_values()
)
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum Position<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
Before,
Row(Path<K, A>),
After { hint: Option<Path<K, A>> },
}
impl<K, A> Clone for Position<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn clone(&self) -> Self {
match self {
Position::Before => Position::Before,
Position::Row(path) => Position::Row(path.clone()),
Position::After { hint } => Position::After { hint: hint.clone() },
}
}
}
impl<K, A> Position<K, A>
where
K: DataTrait + ?Sized,
A: DataTrait + ?Sized,
{
fn for_row<N, T>(row_group: &RowGroup<'_, K, A, N, T>, row: u64) -> Result<Self, Error>
where
T: ColumnSpec,
{
Ok(Self::Row(Path::for_row(row_group, row)?))
}
fn for_row_from_hint<N, T>(
row_group: &RowGroup<'_, K, A, N, T>,
hint: &Self,
row: u64,
) -> Result<Self, Error>
where
T: ColumnSpec,
{
Ok(Self::Row(Path::for_row_from_hint(
row_group,
hint.hint(),
row,
)?))
}
fn next<N, T>(&mut self, row_group: &RowGroup<'_, K, A, N, T>) -> Result<(), Error>
where
T: ColumnSpec,
{
let row = match self {
Self::Before => row_group.rows.start,
Self::Row(path) => path.row + 1,
Self::After { .. } => return Ok(()),
};
if row < row_group.rows.end {
self.move_to_row(row_group, row)
} else {
self.move_after();
Ok(())
}
}
fn prev<N, T>(&mut self, row_group: &RowGroup<'_, K, A, N, T>) -> Result<(), Error>
where
T: ColumnSpec,
{
match self {
Self::Before => (),
Self::Row(path) => {
if path.row > row_group.rows.start {
path.move_to_row(row_group, path.row - 1)?;
} else {
*self = Self::Before;
}
}
Self::After { hint } => {
*self = if !row_group.is_empty() {
Self::Row(Path::for_row_from_hint(
row_group,
hint.as_ref(),
row_group.rows.end - 1,
)?)
} else {
Self::Before
}
}
}
Ok(())
}
fn move_to_row<N, T>(
&mut self,
row_group: &RowGroup<'_, K, A, N, T>,
row: u64,
) -> Result<(), Error>
where
T: ColumnSpec,
{
if !row_group.rows.is_empty() {
match self {
Position::Before => *self = Self::Row(Path::for_row(row_group, row)?),
Position::After { hint } => {
*self = Self::Row(Path::for_row_from_hint(row_group, hint.as_ref(), row)?)
}
Position::Row(path) => path.move_to_row(row_group, row)?,
}
}
Ok(())
}
fn move_after(&mut self) {
*self = Position::After {
hint: self.take_hint(),
};
}
fn take_hint(&mut self) -> Option<Path<K, A>> {
match replace(self, Position::Before) {
Position::Before => None,
Position::Row(hint) => Some(hint),
Position::After { hint } => hint,
}
}
fn hint(&self) -> Option<&Path<K, A>> {
match self {
Position::Before => None,
Position::Row(path) => Some(path),
Position::After { hint } => hint.as_ref(),
}
}
fn path(&self) -> Option<&Path<K, A>> {
match self {
Position::Before => None,
Position::Row(path) => Some(path),
Position::After { .. } => None,
}
}
pub unsafe fn key<'k>(&self, factories: &Factories<K, A>, key: &'k mut K) -> Option<&'k mut K> {
unsafe {
self.path().map(|path| {
path.key(factories, key);
key
})
}
}
pub unsafe fn aux<'a>(&self, factories: &Factories<K, A>, aux: &'a mut A) -> Option<&'a mut A> {
unsafe {
self.path().map(|path| {
path.aux(factories, aux);
aux
})
}
}
pub unsafe fn item<'a>(
&self,
factories: &Factories<K, A>,
item: (&'a mut K, &'a mut A),
) -> Option<(&'a mut K, &'a mut A)> {
unsafe {
self.path().map(|path| {
path.item(factories, (item.0, item.1));
item
})
}
}
pub unsafe fn archived_item(
&self,
factories: &Factories<K, A>,
) -> Option<&dyn ArchivedItem<'_, K, A>> {
unsafe { self.path().map(|path| path.archived_item(factories)) }
}
pub fn row_group(&self) -> Result<Range<u64>, Error> {
match self.path() {
Some(path) => path.row_group(),
None => Ok(0..0),
}
}
fn has_value(&self) -> bool {
self.path().is_some()
}
unsafe fn best_match<N, T, C>(
row_group: &RowGroup<'_, K, A, N, T>,
compare: &C,
bias: Ordering,
key: &mut K,
) -> Result<Self, Error>
where
T: ColumnSpec,
C: Fn(&K) -> Ordering,
{
unsafe {
match Path::best_match(row_group, compare, bias, key)? {
Some(path) => Ok(Position::Row(path)),
None => Ok(if bias == Less {
Position::After { hint: None }
} else {
Position::Before
}),
}
}
}
fn absolute_position<N, T>(&self, row_group: &RowGroup<K, A, N, T>) -> u64
where
T: ColumnSpec,
{
match self {
Position::Before => row_group.rows.start,
Position::Row(path) => path.row,
Position::After { .. } => row_group.rows.end,
}
}
fn remaining_rows<N, T>(&self, row_group: &RowGroup<K, A, N, T>) -> u64
where
T: ColumnSpec,
{
match self {
Position::Before => row_group.len(),
Position::Row(path) => row_group.rows.end - path.row,
Position::After { .. } => 0,
}
}
unsafe fn advance_to_first_ge<N, T, C>(
&mut self,
row_group: &RowGroup<'_, K, A, N, T>,
compare: &C,
key: &mut K,
) -> Result<(), Error>
where
T: ColumnSpec,
C: Fn(&K) -> Ordering,
{
unsafe {
match self {
Position::Before => {
*self = Self::best_match::<N, T, _>(row_group, compare, Less, key)?;
}
Position::After { .. } => (),
Position::Row(path) => {
match path.advance_to_first_ge(row_group, compare, key) {
Ok(false) => {
*self = Position::After { hint: None };
}
Ok(true) => (),
Err(error) => {
*self = Position::After { hint: None };
return Err(error);
}
}
}
}
Ok(())
}
}
}