use std::{
cmp::{Ordering, Reverse},
collections::BinaryHeap,
pin::Pin,
task::{Context, Poll},
};
use fusio::executor::Executor;
use futures::{Stream, StreamExt, ready};
use pin_project_lite::pin_project;
use super::Order;
use crate::query::stream::{ScanStream, SourcePriority, StreamEntry, StreamError};
pin_project! {
pub(crate) struct MergeStream<'t, E>
where
E: Executor,
{
streams: Vec<ScanStream<'t, E>>,
peeked: BinaryHeap<Reverse<HeapEntry>>,
buf: Option<StreamEntry>,
limit: Option<usize>,
order: Option<Order>,
stream_priority: Vec<SourcePriority>,
}
}
impl<'t, E> MergeStream<'t, E>
where
E: Executor + Clone + 'static,
{
pub(crate) async fn from_vec(
mut streams: Vec<ScanStream<'t, E>>,
limit: Option<usize>,
order: Option<Order>,
) -> Result<Self, StreamError> {
let mut peeked = BinaryHeap::with_capacity(streams.len());
let mut stream_priority = Vec::with_capacity(streams.len());
for (offset, stream) in streams.iter_mut().enumerate() {
let priority = stream.priority();
stream_priority.push(priority);
if let Some(entry) = stream.next().await {
peeked.push(Reverse(HeapEntry::new(offset, priority, entry?, order)));
}
}
let merge_stream = Self {
streams,
peeked,
buf: None,
limit,
order,
stream_priority,
};
Ok(merge_stream)
}
}
impl<'t, E> Stream for MergeStream<'t, E>
where
E: Executor + Clone + 'static,
{
type Item = Result<StreamEntry, StreamError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if this.limit.as_ref().is_some_and(|limit| *limit == 0) {
return Poll::Ready(None);
}
while let Some(stream_idx) = this.peeked.peek().map(|Reverse(e)| e.stream_idx) {
let next = ready!(Pin::new(&mut this.streams[stream_idx]).poll_next(cx)).transpose()?;
let Reverse(heap_entry) = this.peeked.pop().expect("heap non-empty after peek");
let entry = heap_entry.entry;
if let Some(next) = next {
let priority = this.stream_priority[stream_idx];
this.peeked.push(Reverse(HeapEntry::new(
stream_idx,
priority,
next,
*this.order,
)));
}
let duplicate_key = this.buf.as_ref().is_some_and(|buf| buf.key_eq(&entry));
if duplicate_key {
continue;
}
if let Some(prev) = this.buf.replace(entry) {
if prev.is_tombstone() {
continue;
}
decrement_limit(this.limit);
return Poll::Ready(Some(Ok(prev)));
}
}
loop {
match this.buf.take() {
Some(entry) if entry.is_tombstone() => continue,
Some(entry) => {
decrement_limit(this.limit);
break Poll::Ready(Some(Ok(entry)));
}
None => break Poll::Ready(None),
}
}
}
}
struct HeapEntry {
stream_idx: usize,
source_priority: SourcePriority,
entry: StreamEntry,
order: Option<Order>,
}
impl HeapEntry {
pub(crate) fn new(
stream_idx: usize,
priority: SourcePriority,
entry: StreamEntry,
order: Option<Order>,
) -> Self {
Self {
stream_idx,
source_priority: priority,
entry,
order,
}
}
}
impl Ord for HeapEntry {
fn cmp(&self, other: &Self) -> Ordering {
let ordering = self.order.unwrap_or(Order::Asc);
let key_cmp = match ordering {
Order::Asc => self.entry.key_cmp(&other.entry),
};
let ts_cmp = other.entry.ts().cmp(&self.entry.ts());
key_cmp
.then(ts_cmp)
.then(self.source_priority.cmp(&other.source_priority))
.then(self.stream_idx.cmp(&other.stream_idx))
}
}
impl PartialOrd for HeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for HeapEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for HeapEntry {}
fn decrement_limit(limit: &mut Option<usize>) {
if let Some(value) = limit {
*value = value.saturating_sub(1);
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use std::{collections::BTreeMap, sync::Arc};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use futures::StreamExt;
use typed_arrow_dyn::{DynCell, DynRow};
use super::*;
use crate::{
extractor::{KeyProjection, projection_for_columns, projection_for_field},
inmem::{immutable::memtable::ImmutableMemTable, mutable::memtable::DynMem},
key::KeyOwned,
mutation::DynMutation,
mvcc::{MVCC_COMMIT_COL, Timestamp},
query::stream::{OwnedImmutableScan, OwnedMutableScan, ScanStream},
test::build_batch,
transaction::TransactionScan,
};
fn make_test_mem(schema: SchemaRef) -> DynMem {
let extractor: Arc<dyn KeyProjection> = projection_for_field(schema.clone(), 0)
.expect("extractor")
.into();
let delete_projection: Arc<dyn KeyProjection> = projection_for_columns(
Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)])),
vec![0],
)
.expect("delete projection")
.into();
DynMem::new(schema, extractor, delete_projection)
}
#[tokio::test(flavor = "multi_thread")]
async fn merge_stream_prefers_higher_priority_for_same_key() {
async fn run_merge(order: Order) -> Vec<(String, i64, u64)> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int64, true),
]));
let mutable = make_test_mem(schema.clone());
let immutable_builder = make_test_mem(schema.clone());
let insert_row = |table: &DynMem, key: &str, value: i64, ts: u64| {
let batch = build_batch(
schema.clone(),
vec![DynRow(vec![
Some(DynCell::Str(key.into())),
Some(DynCell::I64(value)),
])],
)
.expect("batch");
table
.insert_batch(batch, Timestamp::new(ts))
.expect("insert");
};
insert_row(&mutable, "a", 2, 50);
insert_row(&mutable, "b", 5, 20);
insert_row(&mutable, "c", 10, 10);
insert_row(&mutable, "d", 100, 60);
insert_row(&immutable_builder, "a", 1, 50);
insert_row(&immutable_builder, "c", 40, 40);
insert_row(&immutable_builder, "d", 25, 60);
let immutable_segment: ImmutableMemTable = immutable_builder
.seal_now()
.expect("seal ok")
.expect("segment");
let mutable_guard = mutable.read();
let mutable_owned_scan =
OwnedMutableScan::from_guard(mutable_guard, None, Timestamp::MAX)
.expect("mutable scan");
let immutable_segment = Arc::new(immutable_segment);
let immutable_scan = immutable_segment
.scan_visible(None, Timestamp::MAX)
.expect("immutable scan");
let mut staged_txn1 = BTreeMap::new();
staged_txn1.insert(
KeyOwned::from("d"),
DynMutation::Upsert(DynRow(vec![
Some(DynCell::Str("d".into())),
Some(DynCell::I64(500)),
])),
);
let txn_scan1 = TransactionScan::new(&staged_txn1, &schema, Timestamp::new(60), None)
.expect("txn scan");
let mut staged_txn2 = BTreeMap::new();
staged_txn2.insert(
KeyOwned::from("d"),
DynMutation::Upsert(DynRow(vec![
Some(DynCell::Str("d".into())),
Some(DynCell::I64(500)),
])),
);
let txn_scan2 = TransactionScan::new(&staged_txn2, &schema, Timestamp::new(60), None)
.expect("txn scan");
let streams = vec![
ScanStream::<'_, fusio::executor::NoopExecutor>::from(OwnedImmutableScan::new(
Arc::clone(&immutable_segment),
immutable_scan,
)),
ScanStream::<'_, fusio::executor::NoopExecutor>::from(mutable_owned_scan),
ScanStream::<'_, fusio::executor::NoopExecutor>::from(txn_scan1),
ScanStream::<'_, fusio::executor::NoopExecutor>::from(txn_scan2),
];
let mut merge = MergeStream::from_vec(streams, None, Some(order))
.await
.expect("merge built");
let mut rows = Vec::new();
while let Some(entry) = merge.next().await {
let entry = entry.expect("entry ok");
let (key, ts, row) = match entry {
StreamEntry::Txn((key_ts, row)) => (
key_ts.key().to_owned(),
key_ts.timestamp(),
row.into_owned().expect("row"),
),
StreamEntry::MemTable((key_ts, row)) => (
key_ts.key().to_owned(),
key_ts.timestamp(),
row.into_owned().expect("row"),
),
StreamEntry::Sstable(row_ref) => {
let key_ts = row_ref.key_ts();
(
key_ts.key().to_owned(),
key_ts.timestamp(),
row_ref.into_row_owned().expect("row"),
)
}
StreamEntry::TxnTombstone(_)
| StreamEntry::MemTableTombstone(_)
| StreamEntry::SstableTombstone(_) => continue,
};
let key_str = key.as_utf8().expect("utf8 key").to_string();
let value = row.0[1]
.as_ref()
.and_then(|cell| match cell {
DynCell::I64(v) => Some(*v),
_ => None,
})
.expect("int value");
rows.push((key_str, value, ts.get()));
}
rows
}
let asc = run_merge(Order::Asc).await;
assert_eq!(
asc,
vec![
("a".to_string(), 2, 50),
("b".to_string(), 5, 20),
("c".to_string(), 40, 40),
("d".to_string(), 500, 60),
],
"ascending order should emit keys from smallest to largest, preferring newer \
timestamps and higher priority sources",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn merge_stream_hides_rows_for_tombstoned_keys() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int64, false),
]));
let mutable = make_test_mem(schema.clone());
let immutable_builder = make_test_mem(schema.clone());
let insert_row = |table: &DynMem, key: &str, value: i64, ts: u64| {
let batch = build_batch(
schema.clone(),
vec![DynRow(vec![
Some(DynCell::Str(key.into())),
Some(DynCell::I64(value)),
])],
)
.expect("batch");
table
.insert_batch(batch, Timestamp::new(ts))
.expect("insert");
};
insert_row(&immutable_builder, "ghost", 10, 10);
insert_row(&immutable_builder, "keep-immutable", 25, 12);
let immutable_segment: ImmutableMemTable = immutable_builder
.seal_now()
.expect("seal ok")
.expect("segment");
insert_row(&mutable, "mutable-only", 99, 40);
let delete_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new(MVCC_COMMIT_COL, DataType::UInt64, false),
]));
let delete_rows = vec![DynRow(vec![
Some(DynCell::Str("ghost".into())),
Some(DynCell::U64(30)),
])];
let delete_batch = build_batch(delete_schema, delete_rows).expect("delete batch");
mutable
.insert_delete_batch(delete_batch)
.expect("delete row");
let immutable_segment = Arc::new(immutable_segment);
let immutable_scan = immutable_segment
.scan_visible(None, Timestamp::MAX)
.expect("immutable scan");
let mutable_guard = mutable.read();
let mutable_owned_scan = OwnedMutableScan::from_guard(mutable_guard, None, Timestamp::MAX)
.expect("mutable scan");
let streams = vec![
ScanStream::<'_, fusio::executor::NoopExecutor>::from(OwnedImmutableScan::new(
Arc::clone(&immutable_segment),
immutable_scan,
)),
ScanStream::<'_, fusio::executor::NoopExecutor>::from(mutable_owned_scan),
];
let mut merge = MergeStream::from_vec(streams, None, Some(Order::Asc))
.await
.expect("merge built");
let mut rows = Vec::new();
while let Some(entry) = merge.next().await {
let entry = entry.expect("entry ok");
let (key, row) = match entry {
StreamEntry::Txn((key_ts, row)) => {
(key_ts.key().to_owned(), row.into_owned().expect("row"))
}
StreamEntry::MemTable((key_ts, row)) => {
(key_ts.key().to_owned(), row.into_owned().expect("row"))
}
StreamEntry::Sstable(row_ref) => (
row_ref.key_ts().key().to_owned(),
row_ref.into_row_owned().expect("row"),
),
StreamEntry::TxnTombstone(_)
| StreamEntry::MemTableTombstone(_)
| StreamEntry::SstableTombstone(_) => continue,
};
let key_str = key.as_utf8().expect("utf8 key").to_string();
let value = row.0[1]
.as_ref()
.and_then(|cell| match cell {
DynCell::I64(v) => Some(*v),
_ => None,
})
.expect("i64 value");
rows.push((key_str, value));
}
assert_eq!(
rows,
vec![
("keep-immutable".to_string(), 25),
("mutable-only".to_string(), 99)
],
"ghost should be suppressed because the mutable tombstone is newer"
);
}
}