use std::{path::Path, sync::Arc};
use crate::iceberg::{version::DataFileMeta, DeletionVector};
use crate::memtable::iterator::MemEntry;
use crate::parquet::reader::ParquetReader;
use crate::types::{
key::InternalKey,
level::Level,
schema::TableSchema,
sequence::{OpType, SeqNum},
value::{FieldValue, Row},
MeruError, Result,
};
use bytes::Bytes;
use roaring::RoaringBitmap;
use tracing::{debug, instrument, trace};
use crate::engine::engine::MeruEngine;
fn open_file(
base: &Path,
file: &DataFileMeta,
schema: Arc<TableSchema>,
) -> Result<(ParquetReader<Bytes>, Option<RoaringBitmap>)> {
let abs_parquet = base.join(&file.path);
let parquet_bytes = std::fs::read(&abs_parquet).map_err(MeruError::Io)?;
let reader = ParquetReader::open(Bytes::from(parquet_bytes), schema)?;
let dv = match (&file.dv_path, file.dv_offset, file.dv_length) {
(Some(dv_path), Some(offset), Some(length)) => {
let abs_dv = base.join(dv_path);
let puffin_bytes = std::fs::read(&abs_dv).map_err(MeruError::Io)?;
let start = offset as usize;
let end = start
.checked_add(length as usize)
.ok_or_else(|| MeruError::Corruption("DV offset+length overflow".into()))?;
if end > puffin_bytes.len() {
return Err(MeruError::Corruption(format!(
"DV blob out of range: path={dv_path} offset={offset} length={length} puffin_len={}",
puffin_bytes.len()
)));
}
let dv = DeletionVector::from_puffin_blob(&puffin_bytes[start..end])?;
Some(dv.bitmap().clone())
}
(None, None, None) => None,
_ => {
return Err(MeruError::Corruption(format!(
"inconsistent DV coords on file {}: dv_path={:?} dv_offset={:?} dv_length={:?}",
file.path, file.dv_path, file.dv_offset, file.dv_length
)));
}
};
Ok((reader, dv))
}
#[instrument(skip(engine), fields(op = "point_lookup"))]
pub fn point_lookup(engine: &MeruEngine, pk_values: &[FieldValue]) -> Result<Option<Row>> {
let read_seq = engine.read_seq();
let ikey = InternalKey::encode(pk_values, read_seq, OpType::Put, &engine.schema)?;
let user_key_bytes = ikey.user_key_bytes().to_vec();
if let Some(entry) = engine.memtable.get(&user_key_bytes, read_seq) {
if entry.op_type == OpType::Delete {
return Ok(None); }
let row = crate::engine::codec::decode_row(&entry.value)?;
trace!(source = "memtable", "cache hit");
return Ok(Some(row));
}
if let Some(ref cache) = engine.row_cache {
if let Some(entry) = cache.get(&user_key_bytes) {
if entry.op_type == OpType::Delete {
return Ok(None);
}
trace!(source = "row_cache", "cache hit");
return Ok(Some(entry.row));
}
}
let cache_gen = engine.row_cache.as_ref().map(|c| c.snapshot_generation());
let (_pin, version) = engine.pin_current_snapshot();
let base = engine.catalog.base_path();
for file in version.files_at(Level(0)) {
if !range_contains(&file.meta.key_min, &file.meta.key_max, &user_key_bytes) {
continue;
}
let (reader, dv) = open_file(base, file, engine.schema.clone())?;
if let Some((hit_ikey, row)) = reader.get(&user_key_bytes, read_seq, dv.as_ref())? {
if let (Some(ref cache), Some(gen)) = (&engine.row_cache, cache_gen) {
cache.insert_if_fresh(
user_key_bytes.clone(),
crate::engine::cache::CacheEntry {
op_type: hit_ikey.op_type,
row: row.clone(),
},
gen,
);
}
if hit_ikey.op_type == OpType::Delete {
return Ok(None);
}
debug!(source = "L0", file = %file.path, "point lookup hit");
return Ok(Some(row));
}
}
let max_level = version.max_level();
for lvl in 1..=max_level.0 {
let level = Level(lvl);
let Some(file) = version.find_file_for_key(level, &user_key_bytes) else {
continue;
};
let (reader, dv) = open_file(base, file, engine.schema.clone())?;
if let Some((hit_ikey, row)) = reader.get(&user_key_bytes, read_seq, dv.as_ref())? {
if let (Some(ref cache), Some(gen)) = (&engine.row_cache, cache_gen) {
cache.insert_if_fresh(
user_key_bytes.clone(),
crate::engine::cache::CacheEntry {
op_type: hit_ikey.op_type,
row: row.clone(),
},
gen,
);
}
if hit_ikey.op_type == OpType::Delete {
return Ok(None);
}
debug!(source = %format!("L{}", lvl), file = %file.path, "point lookup hit");
return Ok(Some(row));
}
}
trace!("point lookup miss");
Ok(None)
}
pub fn point_lookup_at_seq(
engine: &MeruEngine,
user_key_bytes: &[u8],
read_seq: SeqNum,
) -> Result<Option<Row>> {
if let Some(entry) = engine.memtable.get(user_key_bytes, read_seq) {
if entry.op_type == OpType::Delete {
return Ok(None);
}
return Ok(Some(crate::engine::codec::decode_row(&entry.value)?));
}
let (_pin, version) = engine.pin_current_snapshot();
let base = engine.catalog.base_path();
for file in version.files_at(Level(0)) {
if !range_contains(&file.meta.key_min, &file.meta.key_max, user_key_bytes) {
continue;
}
let (reader, dv) = open_file(base, file, engine.schema.clone())?;
if let Some((hit_ikey, row)) = reader.get(user_key_bytes, read_seq, dv.as_ref())? {
if hit_ikey.op_type == OpType::Delete {
return Ok(None);
}
return Ok(Some(row));
}
}
let max_level = version.max_level();
for lvl in 1..=max_level.0 {
let level = Level(lvl);
let Some(file) = version.find_file_for_key(level, user_key_bytes) else {
continue;
};
let (reader, dv) = open_file(base, file, engine.schema.clone())?;
if let Some((hit_ikey, row)) = reader.get(user_key_bytes, read_seq, dv.as_ref())? {
if hit_ikey.op_type == OpType::Delete {
return Ok(None);
}
return Ok(Some(row));
}
}
Ok(None)
}
fn range_contains(key_min: &[u8], key_max: &[u8], probe: &[u8]) -> bool {
if !key_min.is_empty() && probe < key_min {
return false;
}
if !key_max.is_empty() && probe > key_max {
return false;
}
true
}
#[instrument(skip(engine), fields(op = "range_scan"))]
pub fn range_scan(
engine: &MeruEngine,
start_pk: Option<&[FieldValue]>,
end_pk: Option<&[FieldValue]>,
) -> Result<Vec<(InternalKey, Row)>> {
let (_pin, version) = engine.pin_current_snapshot();
let read_seq = engine.read_seq();
let start_bytes = start_pk
.map(|pk| {
InternalKey::encode(pk, read_seq, OpType::Put, &engine.schema)
.map(|ik| ik.user_key_bytes().to_vec())
})
.transpose()?;
let end_bytes = end_pk
.map(|pk| {
InternalKey::encode(pk, read_seq, OpType::Put, &engine.schema)
.map(|ik| ik.user_key_bytes().to_vec())
})
.transpose()?;
let mut harvest: Vec<(InternalKey, Row, OpType)> = Vec::new();
let mem_snapshots = engine.memtable.snapshot_entries(read_seq);
let mut mem_all: Vec<MemEntry> = Vec::new();
for s in mem_snapshots {
mem_all.extend(s);
}
for entry in &mem_all {
let uk = entry.user_key.as_ref();
if let Some(ref start) = start_bytes {
if uk < start.as_slice() {
continue;
}
}
if let Some(ref end) = end_bytes {
if uk >= end.as_slice() {
continue;
}
}
let tag = (crate::types::sequence::SEQNUM_MAX.0 - entry.seq.0) << 8
| (entry.entry.op_type as u64);
let mut wire = Vec::with_capacity(uk.len() + 8);
wire.extend_from_slice(uk);
wire.extend_from_slice(&tag.to_be_bytes());
let ikey = InternalKey::decode(&wire, &engine.schema)?;
let row: Row = if entry.entry.op_type == OpType::Put && !entry.entry.value.is_empty() {
crate::engine::codec::decode_row(&entry.entry.value)?
} else {
Row::default()
};
harvest.push((ikey, row, entry.entry.op_type));
}
let base = engine.catalog.base_path();
let max_level = version.max_level();
for lvl in 0..=max_level.0 {
let level = Level(lvl);
for file in version.files_at(level) {
if let Some(ref start) = start_bytes {
if !file.meta.key_max.is_empty() && file.meta.key_max.as_slice() < start.as_slice()
{
continue;
}
}
if let Some(ref end) = end_bytes {
if !file.meta.key_min.is_empty() && file.meta.key_min.as_slice() >= end.as_slice() {
continue;
}
}
let (reader, dv) = open_file(base, file, engine.schema.clone())?;
let file_rows = reader.scan(
start_bytes.as_deref(),
end_bytes.as_deref(),
read_seq,
dv.as_ref(),
)?;
for (ikey, row) in file_rows {
let op = ikey.op_type;
harvest.push((ikey, row, op));
}
}
}
harvest.sort_by(|a, b| a.0.cmp(&b.0));
let mut results: Vec<(InternalKey, Row)> = Vec::new();
let mut last_uk: Option<Vec<u8>> = None;
for (ikey, row, op) in harvest {
let uk = ikey.user_key_bytes().to_vec();
if let Some(ref last) = last_uk {
if *last == uk {
continue; }
}
last_uk = Some(uk);
if op == OpType::Delete {
continue;
}
results.push((ikey, row));
}
debug!(result_count = results.len(), "range scan complete");
Ok(results)
}