use std::{collections::BTreeMap, fmt, ops::Bound, sync::Arc};
use arrow_array::{ArrayRef, RecordBatch, UInt64Array};
use arrow_schema::{Schema, SchemaRef};
use typed_arrow_dyn::{DynProjection, DynRowRaw, DynSchema, DynViewError};
pub(crate) use crate::mvcc::MVCC_COMMIT_COL;
use crate::{
extractor::{KeyExtractError, map_view_err},
key::{KeyOwned, KeyRow, KeyTsViewRaw},
mvcc::Timestamp,
};
pub(crate) const MVCC_TOMBSTONE_COL: &str = "_tombstone";
#[derive(Debug)]
pub(crate) struct ImmutableMemTable {
storage: RecordBatch,
index: BTreeMap<KeyTsViewRaw, ImmutableIndexEntry>,
mvcc: MvccColumns,
deletes: DeleteSidecar,
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum ImmutableIndexEntry {
Row(u32),
Delete,
}
#[derive(Clone, Debug)]
pub(crate) struct DeleteSidecar {
keys: RecordBatch,
commit_ts: Vec<Timestamp>,
}
impl DeleteSidecar {
pub fn new(keys: RecordBatch, commit_ts: Vec<Timestamp>) -> Self {
debug_assert_eq!(keys.num_rows(), commit_ts.len());
Self { keys, commit_ts }
}
pub fn empty(schema: &SchemaRef) -> Self {
Self {
keys: RecordBatch::new_empty(schema.clone()),
commit_ts: Vec::new(),
}
}
pub fn len(&self) -> usize {
self.commit_ts.len()
}
pub fn is_empty(&self) -> bool {
self.commit_ts.is_empty()
}
#[cfg(all(test, feature = "tokio"))]
pub fn commit_ts(&self, idx: usize) -> Timestamp {
self.commit_ts[idx]
}
pub fn key_batch(&self) -> &RecordBatch {
&self.keys
}
pub fn to_record_batch(&self) -> Result<RecordBatch, arrow_schema::ArrowError> {
use arrow_schema::{Field, Schema};
let mut columns: Vec<ArrayRef> = self.keys.columns().to_vec();
let commit_array = UInt64Array::from_iter_values(self.commit_ts.iter().map(|ts| ts.get()));
columns.push(Arc::new(commit_array) as ArrayRef);
let mut fields: Vec<Field> = self
.keys
.schema()
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
fields.push(Field::new(
MVCC_COMMIT_COL,
arrow_schema::DataType::UInt64,
false,
));
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns)
}
}
impl ImmutableMemTable {
pub(crate) fn new(
storage: RecordBatch,
index: BTreeMap<KeyTsViewRaw, ImmutableIndexEntry>,
mvcc: MvccColumns,
deletes: DeleteSidecar,
) -> Self {
Self {
storage,
index,
mvcc,
deletes,
}
}
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.index.len()
}
pub(crate) fn storage(&self) -> &RecordBatch {
&self.storage
}
pub(crate) fn delete_sidecar(&self) -> &DeleteSidecar {
&self.deletes
}
pub(crate) fn has_conflict(&self, key: &KeyOwned, snapshot_ts: Timestamp) -> bool {
let upper = KeyTsViewRaw::from_owned(key, Timestamp::MAX);
let lower = KeyTsViewRaw::from_owned(key, Timestamp::MIN);
self.index
.range(upper..=lower)
.any(|(view, _)| view.timestamp() > snapshot_ts)
}
pub(crate) fn row_iter(&self) -> ImmutableRowIter {
ImmutableRowIter::new(self)
}
pub(crate) fn entry_count(&self) -> usize {
self.index.len()
}
pub(crate) fn min_key(&self) -> Option<KeyOwned> {
self.index.keys().next().map(|view| view.key().to_owned())
}
pub(crate) fn max_key(&self) -> Option<KeyOwned> {
self.index
.keys()
.next_back()
.map(|view| view.key().to_owned())
}
pub(crate) fn mvcc_columns(&self) -> &MvccColumns {
&self.mvcc
}
pub(crate) fn commit_ts_bounds(&self) -> Option<(Timestamp, Timestamp)> {
let mut iter = self.mvcc.commit_ts.iter();
let first = iter.next()?;
let mut min_ts = *first;
let mut max_ts = *first;
for ts in iter {
if *ts < min_ts {
min_ts = *ts;
}
if *ts > max_ts {
max_ts = *ts;
}
}
Some((min_ts, max_ts))
}
fn mvcc_row(&self, row: u32) -> (Timestamp, bool) {
let idx = row as usize;
(self.mvcc.commit_ts[idx], self.mvcc.tombstone[idx])
}
pub(crate) fn scan_visible<'t>(
&'t self,
projection_schema: Option<SchemaRef>,
read_ts: Timestamp,
) -> Result<ImmutableVisibleScan<'t>, KeyExtractError> {
ImmutableVisibleScan::new(self, projection_schema, read_ts, None)
}
pub(crate) fn scan_visible_in_range<'t>(
&'t self,
projection_schema: Option<SchemaRef>,
read_ts: Timestamp,
start: Bound<KeyOwned>,
end: Bound<KeyOwned>,
) -> Result<ImmutableVisibleScan<'t>, KeyExtractError> {
ImmutableVisibleScan::new(self, projection_schema, read_ts, Some((start, end)))
}
}
pub(crate) fn bundle_mvcc_sidecar(
batch: RecordBatch,
commit_ts: Vec<Timestamp>,
tombstone: Vec<bool>,
) -> Result<(RecordBatch, MvccColumns), arrow_schema::ArrowError> {
use arrow_schema::ArrowError;
if commit_ts.len() != tombstone.len() {
return Err(ArrowError::ComputeError(
"commit_ts and tombstone length mismatch".to_string(),
));
}
if commit_ts.len() != batch.num_rows() {
return Err(ArrowError::ComputeError(
"mvcc metadata length mismatch record batch".to_string(),
));
}
let mvcc = MvccColumns::new(commit_ts, tombstone);
Ok((batch, mvcc))
}
pub(crate) struct ImmutableVisibleScan<'t> {
table: &'t ImmutableMemTable,
iter: ImmutableCursor<'t>,
read_ts: Timestamp,
current_key: Option<KeyRow>,
emitted_for_key: bool,
dyn_schema: DynSchema,
projection: DynProjection,
_range_keys: Option<RangeKeyOwners>,
}
#[derive(Debug)]
struct RangeKeyOwners {
start: Option<KeyOwned>,
end: Option<KeyOwned>,
}
type ImmutableIter<'t> = std::collections::btree_map::Iter<'t, KeyTsViewRaw, ImmutableIndexEntry>;
type ImmutableRange<'t> = std::collections::btree_map::Range<'t, KeyTsViewRaw, ImmutableIndexEntry>;
enum ImmutableCursor<'t> {
Iter(ImmutableIter<'t>),
Range(ImmutableRange<'t>),
}
impl<'t> Iterator for ImmutableCursor<'t> {
type Item = (&'t KeyTsViewRaw, &'t ImmutableIndexEntry);
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Iter(iter) => iter.next(),
Self::Range(range) => range.next(),
}
}
}
fn lower_bound_for_immutable(
bound: Bound<KeyOwned>,
slot: &mut Option<KeyOwned>,
) -> Bound<KeyTsViewRaw> {
match bound {
Bound::Included(key) => {
*slot = Some(key);
if let Some(key) = slot.as_ref() {
Bound::Included(KeyTsViewRaw::from_owned(key, Timestamp::MAX))
} else {
Bound::Unbounded
}
}
Bound::Excluded(key) => {
*slot = Some(key);
if let Some(key) = slot.as_ref() {
Bound::Excluded(KeyTsViewRaw::from_owned(key, Timestamp::MIN))
} else {
Bound::Unbounded
}
}
Bound::Unbounded => Bound::Unbounded,
}
}
fn upper_bound_for_immutable(
bound: Bound<KeyOwned>,
slot: &mut Option<KeyOwned>,
) -> Bound<KeyTsViewRaw> {
match bound {
Bound::Included(key) => {
*slot = Some(key);
if let Some(key) = slot.as_ref() {
Bound::Included(KeyTsViewRaw::from_owned(key, Timestamp::MIN))
} else {
Bound::Unbounded
}
}
Bound::Excluded(key) => {
*slot = Some(key);
if let Some(key) = slot.as_ref() {
Bound::Excluded(KeyTsViewRaw::from_owned(key, Timestamp::MAX))
} else {
Bound::Unbounded
}
}
Bound::Unbounded => Bound::Unbounded,
}
}
pub(crate) enum ImmutableVisibleEntry {
Row(KeyTsViewRaw, DynRowRaw),
Tombstone(KeyTsViewRaw),
}
impl ImmutableVisibleEntry {
#[cfg(test)]
pub(crate) fn into_row(self) -> Option<(KeyTsViewRaw, DynRowRaw)> {
match self {
ImmutableVisibleEntry::Row(key, row) => Some((key, row)),
ImmutableVisibleEntry::Tombstone(_) => None,
}
}
}
impl fmt::Debug for ImmutableVisibleScan<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ImmutableVisibleScan")
.field("read_ts", &self.read_ts)
.field("emitted_for_key", &self.emitted_for_key)
.finish()
}
}
impl<'t> ImmutableVisibleScan<'t> {
fn new(
table: &'t ImmutableMemTable,
projection_schema: Option<SchemaRef>,
read_ts: Timestamp,
bounds: Option<(Bound<KeyOwned>, Bound<KeyOwned>)>,
) -> Result<Self, KeyExtractError> {
let base_schema = table.storage.schema();
let dyn_schema = DynSchema::from_ref(base_schema.clone());
let projection = build_projection(&base_schema, projection_schema.as_ref())?;
let (iter, range_keys): (ImmutableCursor<'t>, Option<RangeKeyOwners>) = match bounds {
Some((start, end)) => {
let mut keys = RangeKeyOwners {
start: None,
end: None,
};
let lower = lower_bound_for_immutable(start, &mut keys.start);
let upper = upper_bound_for_immutable(end, &mut keys.end);
(
ImmutableCursor::Range(table.index.range((lower, upper))),
Some(keys),
)
}
None => (ImmutableCursor::Iter(table.index.iter()), None),
};
Ok(Self {
table,
iter,
read_ts,
current_key: None,
emitted_for_key: false,
dyn_schema,
projection,
_range_keys: range_keys,
})
}
}
impl<'t> Iterator for ImmutableVisibleScan<'t> {
type Item = Result<ImmutableVisibleEntry, DynViewError>;
fn next(&mut self) -> Option<Self::Item> {
for (view, entry) in self.iter.by_ref() {
let key_view = view.key();
if self
.current_key
.as_ref()
.map(|existing| existing == key_view)
.unwrap_or(false)
{
if self.emitted_for_key {
continue;
}
} else {
self.current_key = Some(key_view.clone());
self.emitted_for_key = false;
}
let entry_commit = view.timestamp();
if entry_commit > self.read_ts {
continue;
}
match entry {
ImmutableIndexEntry::Delete => {
self.emitted_for_key = true;
return Some(Ok(ImmutableVisibleEntry::Tombstone(view.clone())));
}
ImmutableIndexEntry::Row(row_idx) => {
let (commit_ts, tombstone) = self.table.mvcc_row(*row_idx);
if commit_ts > self.read_ts {
continue;
}
if tombstone {
self.emitted_for_key = true;
return Some(Ok(ImmutableVisibleEntry::Tombstone(view.clone())));
}
let batch = &self.table.storage;
let row_idx = *row_idx as usize;
let row =
match self
.projection
.project_row_raw(&self.dyn_schema, batch, row_idx)
{
Ok(row) => row,
Err(err) => return Some(Err(err)),
};
self.emitted_for_key = true;
return Some(Ok(ImmutableVisibleEntry::Row(view.clone(), row)));
}
}
}
None
}
}
fn build_projection(
schema: &SchemaRef,
projection_schema: Option<&SchemaRef>,
) -> Result<DynProjection, KeyExtractError> {
if let Some(projected) = projection_schema {
if projected.fields().is_empty() {
return Err(KeyExtractError::Arrow(
arrow_schema::ArrowError::ComputeError(
"projection requires at least one column".to_string(),
),
));
}
return DynProjection::from_schema(schema.as_ref(), projected.as_ref())
.map_err(map_view_err);
}
let logical_fields: Vec<_> = schema
.fields()
.iter()
.filter(|field| {
let name = field.name();
name != MVCC_COMMIT_COL && name != MVCC_TOMBSTONE_COL
})
.map(|field| field.as_ref().clone())
.collect();
if logical_fields.is_empty() {
return Err(KeyExtractError::Arrow(
arrow_schema::ArrowError::ComputeError(
"projection requires at least one column".to_string(),
),
));
}
let logical_schema = SchemaRef::new(Schema::new(logical_fields));
DynProjection::from_schema(schema.as_ref(), logical_schema.as_ref()).map_err(map_view_err)
}
pub(crate) struct ImmutableRowIter {
iter: std::vec::IntoIter<ImmutableRowEntry>,
}
pub(crate) struct ImmutableRowEntry {
pub key: KeyOwned,
pub commit_ts: Timestamp,
pub tombstone: bool,
}
impl ImmutableRowIter {
fn new(table: &ImmutableMemTable) -> Self {
let mut rows: Vec<ImmutableRowEntry> = table
.index
.iter()
.map(|(view, entry)| ImmutableRowEntry {
key: view.key().to_owned(),
commit_ts: view.timestamp(),
tombstone: matches!(entry, ImmutableIndexEntry::Delete),
})
.collect();
rows.sort_by(|a, b| match a.key.cmp(&b.key) {
std::cmp::Ordering::Equal => b.commit_ts.get().cmp(&a.commit_ts.get()),
other => other,
});
Self {
iter: rows.into_iter(),
}
}
}
impl Iterator for ImmutableRowIter {
type Item = ImmutableRowEntry;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}
#[derive(Debug, Clone)]
pub(crate) struct MvccColumns {
pub commit_ts: Vec<Timestamp>,
pub tombstone: Vec<bool>,
}
impl MvccColumns {
pub fn new(commit_ts: Vec<Timestamp>, tombstone: Vec<bool>) -> Self {
debug_assert_eq!(commit_ts.len(), tombstone.len());
Self {
commit_ts,
tombstone,
}
}
}
#[cfg(all(test, feature = "tokio"))]
pub(crate) use tests::segment_from_batch_with_key_name;
#[cfg(test)]
mod tests {
use arrow_schema::{DataType, Field, Schema};
use typed_arrow_dyn::{DynCell, DynRow};
use super::*;
use crate::{
extractor::{KeyProjection, projection_for_columns, projection_for_field},
inmem::mutable::memtable::DynMem,
test::build_batch,
};
pub(crate) fn segment_from_batch_with_extractor(
batch: RecordBatch,
extractor: &dyn KeyProjection,
) -> Result<ImmutableMemTable, KeyExtractError> {
extractor.validate_schema(&batch.schema())?;
let len = batch.num_rows();
let commit_ts = vec![Timestamp::MIN; len];
let tombstone = vec![false; len];
let (batch, mvcc) =
bundle_mvcc_sidecar(batch, commit_ts, tombstone).map_err(KeyExtractError::from)?;
let mut index: BTreeMap<KeyTsViewRaw, ImmutableIndexEntry> = BTreeMap::new();
let row_indices: Vec<usize> = (0..batch.num_rows()).collect();
let key_rows = extractor.project_view(&batch, &row_indices)?;
for (row, key_row) in key_rows.into_iter().enumerate() {
index.insert(
KeyTsViewRaw::new(key_row, mvcc.commit_ts[row]),
ImmutableIndexEntry::Row(row as u32),
);
}
Ok(ImmutableMemTable::new(
batch,
index,
mvcc,
DeleteSidecar::empty(&extractor.key_schema()),
))
}
pub(crate) fn segment_from_batch_with_key_col(
batch: RecordBatch,
key_col: usize,
) -> Result<ImmutableMemTable, KeyExtractError> {
let schema = batch.schema();
let fields = schema.fields();
if key_col >= fields.len() {
return Err(KeyExtractError::ColumnOutOfBounds(key_col, fields.len()));
}
let extractor = projection_for_field(schema.clone(), key_col)?;
segment_from_batch_with_extractor(batch, extractor.as_ref())
}
pub(crate) fn segment_from_batch_with_key_name(
batch: RecordBatch,
key_field: &str,
) -> Result<ImmutableMemTable, KeyExtractError> {
let schema = batch.schema();
let fields = schema.fields();
let Some((idx, _)) = fields
.iter()
.enumerate()
.find(|(_, f)| f.name() == key_field)
else {
return Err(KeyExtractError::NoSuchField {
name: key_field.to_string(),
});
};
segment_from_batch_with_key_col(batch, idx)
}
fn push_view(storage: &mut Vec<KeyOwned>, key: &str, ts: Timestamp) -> KeyTsViewRaw {
storage.push(KeyOwned::from(key));
let owned = storage
.last()
.expect("storage populated with freshly pushed key");
KeyTsViewRaw::from_owned(owned, ts)
}
#[test]
fn scan_ranges_dynamic_key_name() {
let schema = std::sync::Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let rows = vec![
DynRow(vec![Some(DynCell::Str("a".into())), Some(DynCell::I32(1))]),
DynRow(vec![Some(DynCell::Str("c".into())), Some(DynCell::I32(2))]),
DynRow(vec![Some(DynCell::Str("b".into())), Some(DynCell::I32(3))]),
];
let batch: RecordBatch = build_batch(schema.clone(), rows).expect("ok");
let seg = segment_from_batch_with_key_name(batch, "id").expect("seg");
let got: Vec<String> = seg
.scan_visible(None, Timestamp::MAX)
.expect("scan visible")
.filter_map(|res| {
let entry = res.expect("row projection");
entry.into_row().map(|(view, _)| view.key().to_owned())
})
.filter(|k| k.as_utf8().map(|v| v >= "b").unwrap_or(false))
.map(|k| k.as_utf8().expect("utf8 key").to_string())
.collect();
assert_eq!(got, vec!["b".to_string(), "c".to_string()]);
}
#[test]
fn scan_visible_filters_by_timestamp() {
let schema = std::sync::Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let rows = vec![
DynRow(vec![Some(DynCell::Str("k".into())), Some(DynCell::I32(4))]),
DynRow(vec![Some(DynCell::Str("k".into())), Some(DynCell::I32(3))]),
DynRow(vec![Some(DynCell::Str("k".into())), Some(DynCell::I32(2))]),
DynRow(vec![Some(DynCell::Str("k".into())), Some(DynCell::I32(1))]),
];
let batch: RecordBatch = build_batch(schema.clone(), rows).expect("batch");
let mut key_storage = Vec::new();
let mut composite = BTreeMap::new();
let commits = [40u64, 30, 20, 10];
for (row, ts) in commits.into_iter().enumerate() {
let view = push_view(&mut key_storage, "k", Timestamp::new(ts));
composite.insert(view, ImmutableIndexEntry::Row(row as u32));
}
let (batch, mvcc) = bundle_mvcc_sidecar(
batch,
vec![
Timestamp::new(40),
Timestamp::new(30),
Timestamp::new(20),
Timestamp::new(10),
],
vec![false, false, false, false],
)
.expect("mvcc columns");
let delete_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let delete_sidecar = DeleteSidecar::empty(&delete_schema);
let seg = ImmutableMemTable::new(batch, composite, mvcc, delete_sidecar);
let first_visible: Vec<_> = seg
.scan_visible(None, Timestamp::new(15))
.expect("scan visible")
.filter_map(|res| {
let entry = res.expect("row projection");
entry
.into_row()
.map(|(_, row)| row.into_owned().expect("row"))
})
.collect();
assert_eq!(first_visible.len(), 1);
let latest: Vec<_> = seg
.scan_visible(None, Timestamp::new(45))
.expect("scan visible")
.filter_map(|res| {
let entry = res.expect("row projection");
entry
.into_row()
.map(|(_, row)| row.into_owned().expect("row"))
})
.collect();
assert_eq!(latest.len(), 1);
let value_after_first = match first_visible[0].0[1].as_ref() {
Some(DynCell::I32(v)) => *v,
_ => panic!("unexpected cell"),
};
let value_latest = match latest[0].0[1].as_ref() {
Some(DynCell::I32(v)) => *v,
_ => panic!("unexpected cell"),
};
assert_eq!(value_after_first, 1);
assert_eq!(value_latest, 4);
}
#[test]
fn scan_visible_skips_tombstones() {
let schema = std::sync::Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let rows = vec![
DynRow(vec![Some(DynCell::Str("k".into())), Some(DynCell::I32(3))]),
DynRow(vec![Some(DynCell::Str("k".into())), Some(DynCell::I32(2))]),
DynRow(vec![Some(DynCell::Str("k".into())), Some(DynCell::I32(1))]),
];
let batch: RecordBatch = build_batch(schema.clone(), rows).expect("batch");
let mut key_storage = Vec::new();
let mut composite = BTreeMap::new();
for (row, (ts, tombstone)) in [(30u64, false), (20, true), (10, false)]
.into_iter()
.enumerate()
{
let view = push_view(&mut key_storage, "k", Timestamp::new(ts));
let entry = if tombstone {
ImmutableIndexEntry::Delete
} else {
ImmutableIndexEntry::Row(row as u32)
};
composite.insert(view, entry);
}
let (batch, mvcc) = bundle_mvcc_sidecar(
batch,
vec![Timestamp::new(30), Timestamp::new(20), Timestamp::new(10)],
vec![false, false, false],
)
.expect("mvcc columns");
let delete_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let delete_rows = vec![DynRow(vec![Some(DynCell::Str("k".into()))])];
let delete_batch: RecordBatch = build_batch(delete_schema, delete_rows).expect("delete");
let delete_sidecar = DeleteSidecar::new(delete_batch, vec![Timestamp::new(20)]);
let seg = ImmutableMemTable::new(batch, composite, mvcc, delete_sidecar);
let schema = seg.storage().schema();
assert!(
!schema
.fields()
.iter()
.any(|f| f.name() == MVCC_COMMIT_COL || f.name() == MVCC_TOMBSTONE_COL),
"unexpected _commit_ts column in immutable storage"
);
let visible: Vec<_> = seg
.scan_visible(None, Timestamp::new(21))
.expect("scan visible")
.filter_map(|res| {
let entry = res.expect("row projection");
entry
.into_row()
.map(|(_, row)| row.into_owned().expect("row"))
})
.collect();
assert_eq!(
visible.len(),
0,
"tombstoned rows should be hidden once delete is visible"
);
}
#[test]
fn scan_visible_handles_delete_then_reinsert() {
let schema = std::sync::Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let extractor: std::sync::Arc<dyn crate::extractor::KeyProjection> =
projection_for_field(schema.clone(), 0)
.expect("mutable delete test extractor")
.into();
let delete_schema = std::sync::Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new(MVCC_COMMIT_COL, DataType::UInt64, false),
]));
let delete_projection: std::sync::Arc<dyn crate::extractor::KeyProjection> =
projection_for_columns(delete_schema.clone(), vec![0])
.expect("delete projection")
.into();
let mutable = DynMem::new(schema.clone(), extractor, delete_projection);
let insert_value = |mem: &DynMem, value: i32, ts: u64| {
let rows = vec![DynRow(vec![
Some(DynCell::Str("k".into())),
Some(DynCell::I32(value)),
])];
let batch: RecordBatch = build_batch(schema.clone(), rows).expect("insert batch");
mem.insert_batch(batch, Timestamp::new(ts))
.expect("insert value");
};
insert_value(&mutable, 1, 10);
let delete_rows = vec![DynRow(vec![
Some(DynCell::Str("k".into())),
Some(DynCell::U64(20)),
])];
let delete_batch: RecordBatch =
build_batch(delete_schema.clone(), delete_rows).expect("delete batch");
mutable.insert_delete_batch(delete_batch).expect("delete");
insert_value(&mutable, 3, 30);
let segment = mutable
.seal_now()
.expect("seal immutable")
.expect("segment");
let rows_after_delete: Vec<DynRow> = segment
.scan_visible(None, Timestamp::new(25))
.expect("scan after delete")
.filter_map(|res| {
let entry = res.expect("row projection");
entry
.into_row()
.map(|(_, row)| row.into_owned().expect("row"))
})
.collect();
assert!(
rows_after_delete.is_empty(),
"rows at or below delete_ts should be hidden"
);
let rows_after_reinsert: Vec<DynRow> = segment
.scan_visible(None, Timestamp::MAX)
.expect("scan after reinsert")
.filter_map(|res| {
let entry = res.expect("row projection");
entry
.into_row()
.map(|(_, row)| row.into_owned().expect("row"))
})
.collect();
assert_eq!(rows_after_reinsert.len(), 1);
match rows_after_reinsert[0].0[1].as_ref() {
Some(DynCell::I32(v)) => assert_eq!(v, &3),
other => panic!("unexpected cell {other:?}"),
}
}
#[test]
fn default_projection_omits_mvcc_columns() {
let schema = std::sync::Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
Field::new(MVCC_COMMIT_COL, DataType::UInt64, false),
Field::new(MVCC_TOMBSTONE_COL, DataType::Boolean, false),
]));
let rows = vec![DynRow(vec![
Some(DynCell::Str("k".into())),
Some(DynCell::I32(1)),
Some(DynCell::U64(10)),
Some(DynCell::Bool(false)),
])];
let batch: RecordBatch = build_batch(schema.clone(), rows).expect("batch");
let mut key_storage = Vec::new();
let view = push_view(&mut key_storage, "k", Timestamp::new(10));
let mut composite = BTreeMap::new();
composite.insert(view, ImmutableIndexEntry::Row(0));
let mvcc = MvccColumns::new(vec![Timestamp::new(10)], vec![false]);
let key_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let delete_sidecar = DeleteSidecar::empty(&key_schema);
let seg = ImmutableMemTable::new(batch, composite, mvcc, delete_sidecar);
let mut scan = seg
.scan_visible(None, Timestamp::MAX)
.expect("scan visible");
let entry = scan.next().expect("row present").expect("row projection");
let row = match entry {
ImmutableVisibleEntry::Row(_, row) => row,
ImmutableVisibleEntry::Tombstone(_) => panic!("expected row entry"),
};
let owned = row.into_owned().expect("row owned");
assert_eq!(
owned.0.len(),
2,
"mvcc columns leaked into default projection"
);
}
#[test]
fn row_iter_exposes_mvcc_and_bounds() {
let schema = std::sync::Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let rows = vec![
DynRow(vec![Some(DynCell::Str("a".into())), Some(DynCell::I32(10))]),
DynRow(vec![Some(DynCell::Str("b".into())), Some(DynCell::I32(8))]),
DynRow(vec![Some(DynCell::Str("a".into())), Some(DynCell::I32(9))]),
];
let batch: RecordBatch = build_batch(schema.clone(), rows).expect("batch");
let mut key_storage = Vec::new();
let mut composite = BTreeMap::new();
for (row, (key, ts)) in [("a", 30u64), ("b", 20), ("a", 10)].into_iter().enumerate() {
let view = push_view(&mut key_storage, key, Timestamp::new(ts));
let entry = if key == "b" {
ImmutableIndexEntry::Delete
} else {
ImmutableIndexEntry::Row(row as u32)
};
composite.insert(view, entry);
}
let (batch, mvcc) = bundle_mvcc_sidecar(
batch,
vec![Timestamp::new(30), Timestamp::new(20), Timestamp::new(10)],
vec![false, false, false],
)
.expect("mvcc columns");
let delete_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let delete_rows = vec![DynRow(vec![Some(DynCell::Str("b".into()))])];
let delete_batch: RecordBatch = build_batch(delete_schema, delete_rows).expect("delete");
let delete_sidecar = DeleteSidecar::new(delete_batch, vec![Timestamp::new(20)]);
let seg = ImmutableMemTable::new(batch, composite, mvcc, delete_sidecar);
let got: Vec<(String, u64, bool)> = seg
.row_iter()
.map(|entry| {
let key = entry.key.as_utf8().expect("utf8 key").to_string();
(key, entry.commit_ts.get(), entry.tombstone)
})
.collect();
assert_eq!(
got,
vec![
("a".to_string(), 30, false),
("a".to_string(), 10, false),
("b".to_string(), 20, true),
]
);
let min_key = seg
.min_key()
.map(|k| k.as_utf8().expect("utf8 key").to_string());
let max_key = seg
.max_key()
.map(|k| k.as_utf8().expect("utf8 key").to_string());
assert_eq!(min_key.as_deref(), Some("a"));
assert_eq!(max_key.as_deref(), Some("b"));
assert_eq!(seg.len(), 3);
}
}