#![allow(dead_code)]
use std::{
cmp::Ordering,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use arrow_array::{Array, RecordBatch, UInt64Array};
use fusio::executor::Executor;
use fusio_parquet::reader::AsyncReader;
use futures::{Stream, ready};
use parquet::{arrow::async_reader::ParquetRecordBatchStream, errors::ParquetError};
use pin_project_lite::pin_project;
use thiserror::Error;
use typed_arrow_dyn::{DynProjection, DynRow, DynRowRaw, DynSchema, DynViewError};
#[repr(transparent)]
pub(crate) struct UnpinExec<E>(pub E);
impl<E> Unpin for UnpinExec<E> {}
impl<E: Clone> Clone for UnpinExec<E> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<E: Executor> Executor for UnpinExec<E> {
type JoinHandle<R>
= E::JoinHandle<R>
where
R: fusio::MaybeSend;
type Mutex<T>
= E::Mutex<T>
where
T: fusio::MaybeSend + fusio::MaybeSync;
type RwLock<T>
= E::RwLock<T>
where
T: fusio::MaybeSend + fusio::MaybeSync;
fn spawn<F>(&self, future: F) -> Self::JoinHandle<F::Output>
where
F: Future + fusio::MaybeSend + 'static,
F::Output: fusio::MaybeSend,
{
self.0.spawn(future)
}
fn mutex<T>(value: T) -> Self::Mutex<T>
where
T: fusio::MaybeSend + fusio::MaybeSync,
{
E::mutex(value)
}
fn rw_lock<T>(value: T) -> Self::RwLock<T>
where
T: fusio::MaybeSend + fusio::MaybeSync,
{
E::rw_lock(value)
}
}
pub(crate) type ParquetStream<E> = ParquetRecordBatchStream<AsyncReader<UnpinExec<E>>>;
use crate::{
extractor::{KeyExtractError, KeyProjection},
inmem::immutable::memtable::{MVCC_COMMIT_COL, MvccColumns},
key::{KeyOwned, KeyTsOwned, KeyTsViewRaw},
mvcc::Timestamp,
query::stream::Order,
};
#[derive(Debug, Error)]
pub enum SstableScanError {
#[error(transparent)]
Parquet(#[from] ParquetError),
#[error("key extraction failed: {0}")]
Key(#[from] KeyExtractError),
#[error("dynamic view of record batch failure: {0}")]
DynView(#[from] DynViewError),
#[error("key owned conversion failed: {0}")]
KeyOwned(#[from] crate::key::KeyOwnedError),
}
pub struct SstableRowRef {
_batch: Arc<RecordBatch>,
key_ts: KeyTsViewRaw,
row: DynRowRaw,
}
impl SstableRowRef {
pub(crate) fn new(batch: Arc<RecordBatch>, key_ts: KeyTsViewRaw, row: DynRowRaw) -> Self {
Self {
_batch: batch,
key_ts,
row,
}
}
pub fn key_ts(&self) -> &KeyTsViewRaw {
&self.key_ts
}
pub fn row(&self) -> &DynRowRaw {
&self.row
}
pub fn into_row_owned(self) -> Result<DynRow, DynViewError> {
self.row.into_owned()
}
}
impl std::fmt::Debug for SstableRowRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SstableRowRef")
.field("key_ts", &self.key_ts)
.finish()
}
}
pub enum SstableVisibleEntry {
Row(SstableRowRef),
Tombstone(KeyTsOwned),
}
impl std::fmt::Debug for SstableVisibleEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Row(row_ref) => f.debug_tuple("Row").field(row_ref).finish(),
Self::Tombstone(key_ts) => f.debug_tuple("Tombstone").field(key_ts).finish(),
}
}
}
struct DataEntry {
batch: Arc<RecordBatch>,
key_ts: KeyTsViewRaw,
row: DynRowRaw,
key_owned: KeyOwned,
}
struct DeleteEntry {
key: KeyOwned,
ts: Timestamp,
}
pub(crate) struct DeleteStreamWithExtractor<E: Executor> {
pub stream: ParquetStream<E>,
pub extractor: Arc<dyn KeyProjection>,
}
pin_project! {
pub(crate) struct SstableScan<E>
where
E: Executor,
{
#[pin]
data_stream: ParquetStream<E>,
#[pin]
delete_stream: Option<ParquetStream<E>>,
data_iter: Option<DataBatchIterator>,
delete_iter: Option<DeleteBatchIterator>,
peeked_data: Option<DataEntry>,
peeked_delete: Option<DeleteEntry>,
projection_indices: Vec<usize>,
data_extractor: Arc<dyn KeyProjection>,
delete_extractor: Option<Arc<dyn KeyProjection>>,
order: Option<Order>,
read_ts: Timestamp,
current_key: Option<KeyOwned>,
emitted_for_key: bool,
}
}
impl<E> SstableScan<E>
where
E: Executor + Clone + 'static,
{
pub fn new(
data_stream: ParquetStream<E>,
delete_stream: Option<DeleteStreamWithExtractor<E>>,
data_extractor: Arc<dyn KeyProjection>,
projection_indices: Vec<usize>,
order: Option<Order>,
read_ts: Timestamp,
) -> Self {
let (delete_stream, delete_extractor) = match delete_stream {
Some(d) => (Some(d.stream), Some(d.extractor)),
None => (None, None),
};
Self {
data_stream,
delete_stream,
data_iter: None,
delete_iter: None,
peeked_data: None,
peeked_delete: None,
projection_indices,
data_extractor,
delete_extractor,
order,
read_ts,
current_key: None,
emitted_for_key: false,
}
}
}
impl<E> Stream for SstableScan<E>
where
E: Executor + Clone + 'static,
{
type Item = Result<SstableVisibleEntry, SstableScanError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if this.peeked_data.is_none() {
match poll_next_data_entry(
this.data_stream.as_mut(),
this.data_iter,
&*this.data_extractor,
this.projection_indices,
*this.order,
*this.read_ts,
cx,
) {
Poll::Ready(Some(Ok(entry))) => *this.peeked_data = Some(entry),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {} Poll::Pending => return Poll::Pending,
}
}
if this.peeked_delete.is_none()
&& let Some(delete_stream_pin) = this.delete_stream.as_mut().as_pin_mut()
{
let delete_extractor = this
.delete_extractor
.as_ref()
.expect("delete extractor must be set when delete stream exists");
match poll_next_delete_entry(
delete_stream_pin,
this.delete_iter,
delete_extractor,
*this.read_ts,
cx,
) {
Poll::Ready(Some(Ok(entry))) => *this.peeked_delete = Some(entry),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
this.delete_stream.set(None);
}
Poll::Pending => return Poll::Pending,
}
}
let peeked_data = this.peeked_data.take();
let peeked_delete = this.peeked_delete.take();
match (peeked_data, peeked_delete) {
(None, None) => {
return Poll::Ready(None);
}
(Some(data), None) => {
if let Some(entry) =
process_data_entry(data, this.current_key, this.emitted_for_key)
{
return Poll::Ready(Some(Ok(entry)));
}
}
(None, Some(delete)) => {
if let Some(entry) =
process_delete_entry(delete, this.current_key, this.emitted_for_key)
{
return Poll::Ready(Some(Ok(entry)));
}
}
(Some(data), Some(delete)) => {
match data.key_owned.cmp(&delete.key) {
Ordering::Less => {
*this.peeked_delete = Some(delete);
if let Some(entry) =
process_data_entry(data, this.current_key, this.emitted_for_key)
{
return Poll::Ready(Some(Ok(entry)));
}
}
Ordering::Greater => {
*this.peeked_data = Some(data);
if let Some(entry) =
process_delete_entry(delete, this.current_key, this.emitted_for_key)
{
return Poll::Ready(Some(Ok(entry)));
}
}
Ordering::Equal => {
let data_ts = data.key_ts.timestamp();
if delete.ts >= data_ts {
if let Some(entry) = process_delete_entry(
delete,
this.current_key,
this.emitted_for_key,
) {
return Poll::Ready(Some(Ok(entry)));
}
} else {
if let Some(entry) =
process_data_entry(data, this.current_key, this.emitted_for_key)
{
return Poll::Ready(Some(Ok(entry)));
}
}
}
}
}
}
}
}
}
fn process_data_entry(
data: DataEntry,
current_key: &mut Option<KeyOwned>,
emitted_for_key: &mut bool,
) -> Option<SstableVisibleEntry> {
let is_new_key = current_key
.as_ref()
.map(|k| k != &data.key_owned)
.unwrap_or(true);
if is_new_key {
*current_key = Some(data.key_owned.clone());
*emitted_for_key = false;
}
if *emitted_for_key {
return None; }
*emitted_for_key = true;
let row_ref = SstableRowRef::new(data.batch, data.key_ts, data.row);
Some(SstableVisibleEntry::Row(row_ref))
}
fn process_delete_entry(
delete: DeleteEntry,
current_key: &mut Option<KeyOwned>,
emitted_for_key: &mut bool,
) -> Option<SstableVisibleEntry> {
let is_new_key = current_key
.as_ref()
.map(|k| k != &delete.key)
.unwrap_or(true);
if is_new_key {
*current_key = Some(delete.key.clone());
*emitted_for_key = false;
}
if *emitted_for_key {
return None; }
*emitted_for_key = true;
let key_ts_owned = KeyTsOwned::new(delete.key, delete.ts);
Some(SstableVisibleEntry::Tombstone(key_ts_owned))
}
fn poll_next_data_entry<E: Executor + Clone + 'static>(
mut data_stream: Pin<&mut ParquetStream<E>>,
data_iter: &mut Option<DataBatchIterator>,
extractor: &Arc<dyn KeyProjection>,
projection_indices: &[usize],
order: Option<Order>,
read_ts: Timestamp,
cx: &mut Context<'_>,
) -> Poll<Option<Result<DataEntry, SstableScanError>>> {
loop {
if let Some(iter) = data_iter.as_mut() {
match iter.next() {
Some(Ok((batch, key_ts, row, key_owned))) => {
if key_ts.timestamp() > read_ts {
continue; }
return Poll::Ready(Some(Ok(DataEntry {
batch,
key_ts,
row,
key_owned,
})));
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
*data_iter = None;
}
}
}
let batch = match ready!(data_stream.as_mut().poll_next(cx)).transpose() {
Ok(Some(b)) => b,
Ok(None) => return Poll::Ready(None), Err(e) => return Poll::Ready(Some(Err(SstableScanError::Parquet(e)))),
};
let mvcc = match decode_mvcc_from_data(&batch) {
Ok(m) => m,
Err(e) => return Poll::Ready(Some(Err(e))),
};
*data_iter = match DataBatchIterator::new(
batch,
projection_indices.to_vec(),
Arc::clone(extractor),
mvcc,
order,
) {
Ok(iter) => Some(iter),
Err(e) => return Poll::Ready(Some(Err(e))),
};
}
}
fn poll_next_delete_entry<E: Executor + Clone + 'static>(
mut delete_stream: Pin<&mut ParquetStream<E>>,
delete_iter: &mut Option<DeleteBatchIterator>,
extractor: &Arc<dyn KeyProjection>,
read_ts: Timestamp,
cx: &mut Context<'_>,
) -> Poll<Option<Result<DeleteEntry, SstableScanError>>> {
loop {
if let Some(iter) = delete_iter.as_mut() {
match iter.next() {
Some(Ok((key, ts))) => {
if ts > read_ts {
continue; }
return Poll::Ready(Some(Ok(DeleteEntry { key, ts })));
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
*delete_iter = None;
}
}
}
let batch = match ready!(delete_stream.as_mut().poll_next(cx)).transpose() {
Ok(Some(b)) => b,
Ok(None) => return Poll::Ready(None), Err(e) => return Poll::Ready(Some(Err(SstableScanError::Parquet(e)))),
};
*delete_iter = match DeleteBatchIterator::new(batch, Arc::clone(extractor)) {
Ok(iter) => Some(iter),
Err(e) => return Poll::Ready(Some(Err(e))),
};
}
}
struct DataBatchIterator {
batch: Arc<RecordBatch>,
extractor: Arc<dyn KeyProjection>,
dyn_schema: DynSchema,
projection: DynProjection,
mvcc: MvccColumns,
offset: usize,
remaining: usize,
}
impl DataBatchIterator {
pub(crate) fn new(
record_batch: RecordBatch,
projection_indices: Vec<usize>,
extractor: Arc<dyn KeyProjection>,
mvcc: MvccColumns,
_order: Option<Order>,
) -> Result<Self, SstableScanError> {
let num_rows = record_batch.num_rows();
let mvcc_len = mvcc.commit_ts.len();
if mvcc_len != num_rows {
return Err(SstableScanError::Key(
KeyExtractError::TombstoneLengthMismatch {
expected: num_rows,
actual: mvcc_len,
},
));
}
let batch_schema = record_batch.schema();
let dyn_schema = DynSchema::from_ref(batch_schema.clone());
let projection = DynProjection::from_indices(batch_schema.as_ref(), projection_indices)
.map_err(crate::extractor::map_view_err)?;
Ok(Self {
batch: Arc::new(record_batch),
extractor,
dyn_schema,
projection,
mvcc,
offset: 0,
remaining: num_rows,
})
}
}
impl Iterator for DataBatchIterator {
type Item = Result<(Arc<RecordBatch>, KeyTsViewRaw, DynRowRaw, KeyOwned), SstableScanError>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
let row_idx = self.offset;
self.remaining -= 1;
self.offset += 1;
let key_rows = match self.extractor.project_view(&self.batch, &[row_idx]) {
Ok(rows) => rows,
Err(err) => return Some(Err(SstableScanError::Key(err))),
};
let key_row = match key_rows.into_iter().next() {
Some(row) => row,
None => {
return Some(Err(SstableScanError::Key(KeyExtractError::RowOutOfBounds(
row_idx,
self.batch.num_rows(),
))));
}
};
let key_owned = match KeyOwned::from_key_row(&key_row) {
Ok(k) => k,
Err(e) => return Some(Err(SstableScanError::KeyOwned(e))),
};
let commit_ts = match self.mvcc.commit_ts.get(row_idx).copied() {
Some(ts) => ts,
None => {
return Some(Err(SstableScanError::Key(KeyExtractError::RowOutOfBounds(
row_idx,
self.mvcc.commit_ts.len(),
))));
}
};
let row = match self
.projection
.project_row_raw(&self.dyn_schema, &self.batch, row_idx)
{
Ok(row) => row,
Err(err) => return Some(Err(SstableScanError::DynView(err))),
};
let key_ts = KeyTsViewRaw::new(key_row, commit_ts);
Some(Ok((Arc::clone(&self.batch), key_ts, row, key_owned)))
}
}
struct DeleteBatchIterator {
batch: RecordBatch,
extractor: Arc<dyn KeyProjection>,
commit_col: UInt64Array,
offset: usize,
remaining: usize,
}
impl DeleteBatchIterator {
pub(crate) fn new(
batch: RecordBatch,
extractor: Arc<dyn KeyProjection>,
) -> Result<Self, SstableScanError> {
let commit_col = batch
.column_by_name(MVCC_COMMIT_COL)
.and_then(|arr| arr.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| {
SstableScanError::Parquet(ParquetError::ArrowError(
"delete sidecar missing commit_ts column".into(),
))
})?
.clone();
let num_rows = batch.num_rows();
Ok(Self {
batch,
extractor,
commit_col,
offset: 0,
remaining: num_rows,
})
}
}
impl Iterator for DeleteBatchIterator {
type Item = Result<(KeyOwned, Timestamp), SstableScanError>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
let row_idx = self.offset;
self.remaining -= 1;
self.offset += 1;
let key_rows = match self.extractor.project_view(&self.batch, &[row_idx]) {
Ok(rows) => rows,
Err(err) => return Some(Err(SstableScanError::Key(err))),
};
let key_row = match key_rows.into_iter().next() {
Some(row) => row,
None => {
return Some(Err(SstableScanError::Key(KeyExtractError::RowOutOfBounds(
row_idx,
self.batch.num_rows(),
))));
}
};
let key = match KeyOwned::from_key_row(&key_row) {
Ok(k) => k,
Err(e) => return Some(Err(SstableScanError::KeyOwned(e))),
};
let ts = Timestamp::new(self.commit_col.value(row_idx));
Some(Ok((key, ts)))
}
}
fn decode_mvcc_from_data(batch: &RecordBatch) -> Result<MvccColumns, SstableScanError> {
let commit_binding = batch.column_by_name(MVCC_COMMIT_COL);
let commit = commit_binding
.as_ref()
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| {
SstableScanError::Parquet(ParquetError::ArrowError(
"commit_ts column missing or not UInt64".into(),
))
})?;
if commit.null_count() > 0 {
return Err(SstableScanError::Parquet(ParquetError::ArrowError(
"commit_ts column contains nulls".into(),
)));
}
let mut commit_ts = Vec::with_capacity(commit.len());
for i in 0..commit.len() {
commit_ts.push(Timestamp::new(commit.value(i)));
}
let tombstones = vec![false; commit.len()];
Ok(MvccColumns::new(commit_ts, tombstones))
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use std::sync::Arc;
use arrow_array::{Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use fusio::{disk::LocalFs, dynamic::DynFs, executor::NoopExecutor, path::Path};
use fusio_parquet::writer::AsyncWriter;
use futures::StreamExt;
use parquet::{
arrow::{AsyncArrowWriter, ProjectionMask},
file::metadata::{PageIndexPolicy, ParquetMetaDataReader},
};
use tempfile::tempdir;
use super::*;
use crate::{
inmem::immutable::memtable::MVCC_COMMIT_COL, ondisk::sstable::open_parquet_stream,
};
async fn write_data_parquet(
fs: Arc<dyn DynFs>,
path: Path,
rows: Vec<(&str, i32, u64)>, ) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
Field::new(MVCC_COMMIT_COL, DataType::UInt64, false),
]));
let ids: Vec<&str> = rows.iter().map(|(k, _, _)| *k).collect();
let values: Vec<i32> = rows.iter().map(|(_, v, _)| *v).collect();
let timestamps: Vec<u64> = rows.iter().map(|(_, _, ts)| *ts).collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(values)),
Arc::new(UInt64Array::from(timestamps)),
],
)
.expect("batch");
let file = fs
.open_options(
&path,
fusio::fs::OpenOptions::default().create(true).write(true),
)
.await
.expect("open file");
let writer = AsyncWriter::new(file, NoopExecutor);
let mut arrow_writer =
AsyncArrowWriter::try_new(writer, Arc::clone(&schema), None).expect("arrow writer");
arrow_writer.write(&batch).await.expect("write batch");
arrow_writer.close().await.expect("close");
}
async fn write_data_parquet_with_extra(
fs: Arc<dyn DynFs>,
path: Path,
rows: Vec<(&str, i32, i32, u64)>, ) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
Field::new("extra", DataType::Int32, false),
Field::new(MVCC_COMMIT_COL, DataType::UInt64, false),
]));
let ids: Vec<&str> = rows.iter().map(|(k, _, _, _)| *k).collect();
let values: Vec<i32> = rows.iter().map(|(_, v, _, _)| *v).collect();
let extras: Vec<i32> = rows.iter().map(|(_, _, extra, _)| *extra).collect();
let timestamps: Vec<u64> = rows.iter().map(|(_, _, _, ts)| *ts).collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(values)),
Arc::new(Int32Array::from(extras)),
Arc::new(UInt64Array::from(timestamps)),
],
)
.expect("batch");
let file = fs
.open_options(
&path,
fusio::fs::OpenOptions::default().create(true).write(true),
)
.await
.expect("open file");
let writer = AsyncWriter::new(file, NoopExecutor);
let mut arrow_writer =
AsyncArrowWriter::try_new(writer, Arc::clone(&schema), None).expect("arrow writer");
arrow_writer.write(&batch).await.expect("write batch");
arrow_writer.close().await.expect("close");
}
async fn write_delete_parquet(
fs: Arc<dyn DynFs>,
path: Path,
deletes: Vec<(&str, u64)>, ) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new(MVCC_COMMIT_COL, DataType::UInt64, false),
]));
let ids: Vec<&str> = deletes.iter().map(|(k, _)| *k).collect();
let timestamps: Vec<u64> = deletes.iter().map(|(_, ts)| *ts).collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(UInt64Array::from(timestamps)),
],
)
.expect("batch");
let file = fs
.open_options(
&path,
fusio::fs::OpenOptions::default().create(true).write(true),
)
.await
.expect("open file");
let writer = AsyncWriter::new(file, NoopExecutor);
let mut arrow_writer =
AsyncArrowWriter::try_new(writer, Arc::clone(&schema), None).expect("arrow writer");
arrow_writer.write(&batch).await.expect("write batch");
arrow_writer.close().await.expect("close");
}
#[cfg_attr(feature = "tokio", tokio::test(flavor = "multi_thread"))]
async fn parquet_projection_reads_key_and_commit_ts() {
let tmpdir = tempdir().expect("tempdir");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let root = Path::from(tmpdir.path().to_string_lossy().to_string());
let data_path = root.child("data.parquet");
write_data_parquet_with_extra(Arc::clone(&fs), data_path.clone(), vec![("a", 1, 10, 5)])
.await;
let file = fs.open(&data_path).await.expect("open");
let size = file.size().await.expect("size");
let mut reader = AsyncReader::new(file, size, UnpinExec(NoopExecutor))
.await
.expect("reader");
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Optional)
.load_and_finish(&mut reader, size)
.await
.expect("metadata");
let schema_descr = metadata.file_metadata().schema_descr();
let mask = ProjectionMask::roots(schema_descr, vec![0, 1, 3]);
let mut data_stream = open_parquet_stream(
Arc::clone(&fs),
data_path,
Some(mask),
None,
None,
None,
NoopExecutor,
)
.await
.expect("open stream");
let batch = data_stream
.next()
.await
.transpose()
.expect("batch result")
.expect("batch");
let schema = batch.schema();
let fields: Vec<&str> = schema
.fields()
.iter()
.map(|field| field.name().as_str())
.collect();
assert_eq!(fields, vec!["id", "v", MVCC_COMMIT_COL]);
}
#[cfg_attr(feature = "tokio", tokio::test(flavor = "multi_thread"))]
async fn read_ts_filters_future_data() {
let tmpdir = tempdir().expect("tempdir");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let root = Path::from(tmpdir.path().to_string_lossy().to_string());
let user_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let extractor: Arc<dyn KeyProjection> =
crate::extractor::projection_for_field(Arc::clone(&user_schema), 0)
.expect("extractor")
.into();
let data_path = root.child("data.parquet");
write_data_parquet(
Arc::clone(&fs),
data_path.clone(),
vec![
("a", 1, 10), ("b", 2, 20), ("c", 3, 30), ],
)
.await;
let data_stream = open_parquet_stream(
Arc::clone(&fs),
data_path,
None,
None,
None,
None,
NoopExecutor,
)
.await
.expect("open stream");
let read_ts = Timestamp::new(20);
let projection_indices = vec![0, 1];
let mut scan = SstableScan::<NoopExecutor>::new(
data_stream,
None, extractor,
projection_indices,
Some(Order::Asc),
read_ts,
);
let mut keys = Vec::new();
while let Some(entry) = scan.next().await {
match entry.expect("entry") {
SstableVisibleEntry::Row(row_ref) => {
let key = KeyOwned::from_key_row(row_ref.key_ts().key()).expect("key");
keys.push(key.as_utf8().expect("utf8").to_string());
}
SstableVisibleEntry::Tombstone(_) => {}
}
}
assert_eq!(
keys,
vec!["a", "b"],
"row 'c' at ts=30 should be invisible at read_ts=20"
);
}
#[cfg_attr(feature = "tokio", tokio::test(flavor = "multi_thread"))]
async fn delete_sidecar_hides_data() {
let tmpdir = tempdir().expect("tempdir");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let root = Path::from(tmpdir.path().to_string_lossy().to_string());
let user_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let data_extractor: Arc<dyn KeyProjection> =
crate::extractor::projection_for_field(Arc::clone(&user_schema), 0)
.expect("extractor")
.into();
let key_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let delete_extractor: Arc<dyn KeyProjection> =
crate::extractor::projection_for_field(Arc::clone(&key_schema), 0)
.expect("delete extractor")
.into();
let data_path = root.child("data.parquet");
write_data_parquet(
Arc::clone(&fs),
data_path.clone(),
vec![
("a", 1, 10), ("b", 2, 20), ("c", 3, 30), ],
)
.await;
let delete_path = root.child("delete.parquet");
write_delete_parquet(
Arc::clone(&fs),
delete_path.clone(),
vec![
("a", 15), ("c", 30), ],
)
.await;
let data_stream = open_parquet_stream(
Arc::clone(&fs),
data_path,
None,
None,
None,
None,
NoopExecutor,
)
.await
.expect("open stream");
let delete_stream = open_parquet_stream(
Arc::clone(&fs),
delete_path,
None,
None,
None,
None,
NoopExecutor,
)
.await
.expect("delete stream");
let read_ts = Timestamp::MAX;
let projection_indices = vec![0, 1];
let mut scan = SstableScan::<NoopExecutor>::new(
data_stream,
Some(DeleteStreamWithExtractor {
stream: delete_stream,
extractor: delete_extractor,
}),
data_extractor,
projection_indices,
Some(Order::Asc),
read_ts,
);
let mut keys = Vec::new();
let mut tombstones = Vec::new();
while let Some(entry) = scan.next().await {
match entry.expect("entry") {
SstableVisibleEntry::Row(row_ref) => {
let key = KeyOwned::from_key_row(row_ref.key_ts().key()).expect("key");
keys.push(key.as_utf8().expect("utf8").to_string());
}
SstableVisibleEntry::Tombstone(key_ts) => {
tombstones.push(key_ts.key().as_utf8().expect("utf8").to_string());
}
}
}
assert_eq!(keys, vec!["b"], "'a' and 'c' should be hidden by deletes");
assert!(
tombstones.contains(&"a".to_string()),
"tombstone for 'a' should be emitted"
);
assert!(
tombstones.contains(&"c".to_string()),
"tombstone for 'c' should be emitted (same-ts delete wins)"
);
}
#[cfg_attr(feature = "tokio", tokio::test(flavor = "multi_thread"))]
async fn source_level_dedup_emits_one_version_per_key() {
let tmpdir = tempdir().expect("tempdir");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let root = Path::from(tmpdir.path().to_string_lossy().to_string());
let user_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let extractor: Arc<dyn KeyProjection> =
crate::extractor::projection_for_field(Arc::clone(&user_schema), 0)
.expect("extractor")
.into();
let data_path = root.child("data.parquet");
write_data_parquet(
Arc::clone(&fs),
data_path.clone(),
vec![
("a", 100, 30), ("a", 10, 20), ("a", 1, 10), ("b", 2, 15), ],
)
.await;
let data_stream = open_parquet_stream(
Arc::clone(&fs),
data_path,
None,
None,
None,
None,
NoopExecutor,
)
.await
.expect("open stream");
let read_ts = Timestamp::MAX;
let projection_indices = vec![0, 1];
let mut scan = SstableScan::<NoopExecutor>::new(
data_stream,
None, extractor,
projection_indices,
Some(Order::Asc),
read_ts,
);
let mut results: Vec<(String, i32)> = Vec::new();
while let Some(entry) = scan.next().await {
if let SstableVisibleEntry::Row(row_ref) = entry.expect("entry") {
let key = KeyOwned::from_key_row(row_ref.key_ts().key()).expect("key");
let key_str = key.as_utf8().expect("utf8").to_string();
let row = row_ref.into_row_owned().expect("row");
let value = match &row.0[1] {
Some(typed_arrow_dyn::DynCell::I32(v)) => *v,
_ => panic!("expected i32 value"),
};
results.push((key_str, value));
}
}
assert_eq!(
results,
vec![("a".to_string(), 100), ("b".to_string(), 2)],
"should emit only latest version (v=100) for 'a'"
);
}
#[cfg_attr(feature = "tokio", tokio::test(flavor = "multi_thread"))]
async fn tombstone_only_keys_emitted() {
let tmpdir = tempdir().expect("tempdir");
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let root = Path::from(tmpdir.path().to_string_lossy().to_string());
let user_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let data_extractor: Arc<dyn KeyProjection> =
crate::extractor::projection_for_field(Arc::clone(&user_schema), 0)
.expect("extractor")
.into();
let key_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let delete_extractor: Arc<dyn KeyProjection> =
crate::extractor::projection_for_field(Arc::clone(&key_schema), 0)
.expect("delete extractor")
.into();
let data_path = root.child("data.parquet");
write_data_parquet(
Arc::clone(&fs),
data_path.clone(),
vec![("b", 2, 20)], )
.await;
let delete_path = root.child("delete.parquet");
write_delete_parquet(
Arc::clone(&fs),
delete_path.clone(),
vec![
("a", 10), ("c", 30), ("d", 40), ],
)
.await;
let data_stream = open_parquet_stream(
Arc::clone(&fs),
data_path,
None,
None,
None,
None,
NoopExecutor,
)
.await
.expect("open stream");
let delete_stream = open_parquet_stream(
Arc::clone(&fs),
delete_path,
None,
None,
None,
None,
NoopExecutor,
)
.await
.expect("delete stream");
let read_ts = Timestamp::MAX;
let projection_indices = vec![0, 1];
let mut scan = SstableScan::<NoopExecutor>::new(
data_stream,
Some(DeleteStreamWithExtractor {
stream: delete_stream,
extractor: delete_extractor,
}),
data_extractor,
projection_indices,
Some(Order::Asc),
read_ts,
);
let mut data_keys = Vec::new();
let mut tombstone_keys = Vec::new();
while let Some(entry) = scan.next().await {
match entry.expect("entry") {
SstableVisibleEntry::Row(row_ref) => {
let key = KeyOwned::from_key_row(row_ref.key_ts().key()).expect("key");
data_keys.push(key.as_utf8().expect("utf8").to_string());
}
SstableVisibleEntry::Tombstone(key_ts) => {
tombstone_keys.push(key_ts.key().as_utf8().expect("utf8").to_string());
}
}
}
assert_eq!(data_keys, vec!["b"], "only 'b' should have data");
assert!(
tombstone_keys.contains(&"a".to_string()),
"tombstone 'a' should be emitted"
);
assert!(
tombstone_keys.contains(&"c".to_string()),
"tombstone 'c' should be emitted"
);
assert!(
tombstone_keys.contains(&"d".to_string()),
"tombstone 'd' should be emitted"
);
}
}